Skip to content

Commit

Permalink
fix: local wal perform different with remote wal (#39967)
Browse files Browse the repository at this point in the history
issue: #38399

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Feb 19, 2025
1 parent f47320e commit fd701ec
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 6 deletions.
6 changes: 3 additions & 3 deletions internal/streamingnode/client/handler/handler_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (hc *handlerClientImpl) GetLatestMVCCTimestampIfLocal(ctx context.Context,
}

// Get the wal at local registry.
w, err := registry.GetAvailableWAL(assign.Channel)
w, err := registry.GetLocalAvailableWAL(assign.Channel)
if err != nil {
return 0, err
}
Expand All @@ -71,7 +71,7 @@ func (hc *handlerClientImpl) CreateProducer(ctx context.Context, opts *ProducerO

p, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error) {
// Check if the localWAL is assigned at local
localWAL, err := registry.GetAvailableWAL(assign.Channel)
localWAL, err := registry.GetLocalAvailableWAL(assign.Channel)
if err == nil {
return localResult(localWAL), nil
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO

c, err := hc.createHandlerAfterStreamingNodeReady(ctx, opts.PChannel, func(ctx context.Context, assign *types.PChannelInfoAssigned) (*handlerCreateResult, error) {
// Check if the localWAL is assigned at local
localWAL, err := registry.GetAvailableWAL(assign.Channel)
localWAL, err := registry.GetLocalAvailableWAL(assign.Channel)
if err == nil {
localScanner, err := localWAL.Read(ctx, wal.ReadOption{
VChannel: opts.VChannel,
Expand Down
20 changes: 20 additions & 0 deletions internal/streamingnode/client/handler/registry/is_local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package registry

var (
_ isLocal = localWAL{}
_ isLocal = localScanner{}
)

// localTrait is used to make isLocal can only be implemented by current package.
type localTrait struct{}

// isLocal is a hint interface for local wal.
type isLocal interface {
isLocal() localTrait
}

// IsLocal checks if the component is local.
func IsLocal(component any) bool {
_, ok := component.(isLocal)
return ok
}
50 changes: 47 additions & 3 deletions internal/streamingnode/client/handler/registry/wal_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
Expand All @@ -28,12 +29,17 @@ func RegisterLocalWALManager(manager WALManager) {
log.Ctx(context.Background()).Info("register local wal manager done")
}

// GetAvailableWAL returns a available wal instance for the channel.
func GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) {
// GetLocalAvailableWAL returns a available wal instance for the channel.
func GetLocalAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) {
if !paramtable.IsLocalComponentEnabled(typeutil.StreamingNodeRole) {
return nil, ErrNoStreamingNodeDeployed
}
return registry.Get().GetAvailableWAL(channel)
l, err := registry.Get().GetAvailableWAL(channel)
if err != nil {
return nil, err
}
return localWAL{l}, nil // because the appended message object may be reused, make some difference between remote wal and local wal.
// so make a copy before appending for local wal to keep the consistency.
}

// WALManager is a hint type for wal manager at streaming node.
Expand All @@ -42,3 +48,41 @@ type WALManager interface {
// Return nil if the wal instance is not found.
GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error)
}

// localWAL is a hint type for local wal.
type localWAL struct {
wal.WAL
}

func (l localWAL) isLocal() localTrait {
return localTrait{}
}

// Append writes a record to the log.
func (l localWAL) Append(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) {
msg = message.CloneMutableMessage(msg)
return l.WAL.Append(ctx, msg)
}

// Append a record to the log asynchronously.
func (l localWAL) AppendAsync(ctx context.Context, msg message.MutableMessage, cb func(*types.AppendResult, error)) {
msg = message.CloneMutableMessage(msg)
l.WAL.AppendAsync(ctx, msg, cb)
}

func (l localWAL) Read(ctx context.Context, deliverPolicy wal.ReadOption) (wal.Scanner, error) {
s, err := l.WAL.Read(ctx, deliverPolicy)
if err != nil {
return nil, err
}
return localScanner{s}, nil
}

// localScanner is a hint type for local wal scanner.
type localScanner struct {
wal.Scanner
}

func (s localScanner) isLocal() localTrait {
return localTrait{}
}
55 changes: 55 additions & 0 deletions internal/streamingnode/client/handler/registry/wal_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package registry

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type mockWALManager struct {
t *testing.T
}

func (m *mockWALManager) GetAvailableWAL(channel types.PChannelInfo) (wal.WAL, error) {
l := mock_wal.NewMockWAL(m.t)
l.EXPECT().Append(mock.Anything, mock.Anything).Return(&types.AppendResult{}, nil)
l.EXPECT().AppendAsync(mock.Anything, mock.Anything, mock.Anything).Return()
l.EXPECT().Read(mock.Anything, mock.Anything).Return(mock_wal.NewMockScanner(m.t), nil)
return l, nil
}

func TestGetLocalAvailableWAL(t *testing.T) {
paramtable.Init()
paramtable.SetLocalComponentEnabled(typeutil.StreamingNodeRole)

manager := &mockWALManager{t: t}
RegisterLocalWALManager(manager)

walInstance, err := GetLocalAvailableWAL(types.PChannelInfo{})
assert.NoError(t, err)
assert.NotNil(t, walInstance)
assert.True(t, IsLocal(walInstance))

msg, _ := message.NewTimeTickMessageBuilderV1().
WithAllVChannel().
WithHeader(&message.TimeTickMessageHeader{}).
WithBody(&msgpb.TimeTickMsg{}).
BuildMutable()
walInstance.Append(context.Background(), msg)
walInstance.AppendAsync(context.Background(), msg, func(ar *wal.AppendResult, err error) {})

s, err := walInstance.Read(context.Background(), wal.ReadOption{})
assert.NoError(t, err)
assert.NotNil(t, walInstance)
assert.True(t, IsLocal(s))
}
12 changes: 12 additions & 0 deletions pkg/streaming/util/message/message_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,18 @@ func (m *messageImpl) SplitIntoMutableMessage() []MutableMessage {
return msgs
}

// CloneMutableMessage clones the current mutable message.
func CloneMutableMessage(msg MutableMessage) MutableMessage {
if msg == nil {
return nil
}
inner := msg.(*messageImpl)
return &messageImpl{
payload: inner.payload,
properties: inner.properties.Clone(),
}
}

type immutableMessageImpl struct {
messageImpl
id MessageID
Expand Down

0 comments on commit fd701ec

Please sign in to comment.