diff --git a/rpcserver.go b/rpcserver.go index 5877410555..72b6a3d883 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7402,24 +7402,9 @@ func (r *rpcServer) AddFederationServer(ctx context.Context, ) (*unirpc.AddFederationServerResponse, error) { serversToAdd := fn.Map(req.Servers, unmarshalUniverseServer) - - for idx := range serversToAdd { - server := serversToAdd[idx] - - // Before we add the server as a federation member, we check - // that we can actually connect to it and that it isn't - // ourselves. - err := CheckFederationServer( - r.cfg.RuntimeID, universe.DefaultTimeout, server, - ) - if err != nil { - return nil, err - } - } - - err := r.cfg.UniverseFederation.AddServer(serversToAdd...) + _, err := r.cfg.UniverseFederation.AddServer(ctx, true, serversToAdd...) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to add server: %w", err) } return &unirpc.AddFederationServerResponse{}, nil diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index 4de28028e2..1b2d455e53 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -151,7 +151,6 @@ type Querier interface { InsertSupplySyncerPushLog(ctx context.Context, arg InsertSupplySyncerPushLogParams) error InsertSupplyUpdateEvent(ctx context.Context, arg InsertSupplyUpdateEventParams) error InsertTxProof(ctx context.Context, arg InsertTxProofParams) error - InsertUniverseServer(ctx context.Context, arg InsertUniverseServerParams) error LinkDanglingSupplyUpdateEvents(ctx context.Context, arg LinkDanglingSupplyUpdateEventsParams) error LogProofTransferAttempt(ctx context.Context, arg LogProofTransferAttemptParams) error LogServerSync(ctx context.Context, arg LogServerSyncParams) error @@ -264,6 +263,9 @@ type Querier interface { UpsertTapscriptTreeRootHash(ctx context.Context, arg UpsertTapscriptTreeRootHashParams) (int64, error) UpsertUniverseLeaf(ctx context.Context, arg UpsertUniverseLeafParams) error UpsertUniverseRoot(ctx context.Context, arg UpsertUniverseRootParams) (int64, error) + // Upserts a universe server by inserting or updating the last sync time for a + // given server host. + UpsertUniverseServer(ctx context.Context, arg UpsertUniverseServerParams) error UpsertUniverseSupplyLeaf(ctx context.Context, arg UpsertUniverseSupplyLeafParams) (int64, error) UpsertUniverseSupplyRoot(ctx context.Context, arg UpsertUniverseSupplyRootParams) (int64, error) } diff --git a/tapdb/sqlc/queries/universe.sql b/tapdb/sqlc/queries/universe.sql index 36d65cdcab..8ede220f97 100644 --- a/tapdb/sqlc/queries/universe.sql +++ b/tapdb/sqlc/queries/universe.sql @@ -99,12 +99,16 @@ ORDER BY CASE WHEN sqlc.narg('sort_direction') = 1 THEN universe_roots.id END DESC LIMIT @num_limit OFFSET @num_offset; --- name: InsertUniverseServer :exec +-- name: UpsertUniverseServer :exec +-- Upserts a universe server by inserting or updating the last sync time for a +-- given server host. INSERT INTO universe_servers( server_host, last_sync_time ) VALUES ( - @server_host, @last_sync_time -); + @server_host, @last_sync_time + ) +ON CONFLICT(server_host) + DO UPDATE SET last_sync_time = EXCLUDED.last_sync_time; -- name: DeleteUniverseServer :exec DELETE FROM universe_servers diff --git a/tapdb/sqlc/universe.sql.go b/tapdb/sqlc/universe.sql.go index 68be1f9a7c..5238d7ef03 100644 --- a/tapdb/sqlc/universe.sql.go +++ b/tapdb/sqlc/universe.sql.go @@ -324,24 +324,6 @@ func (q *Queries) InsertNewSyncEvent(ctx context.Context, arg InsertNewSyncEvent return err } -const InsertUniverseServer = `-- name: InsertUniverseServer :exec -INSERT INTO universe_servers( - server_host, last_sync_time -) VALUES ( - $1, $2 -) -` - -type InsertUniverseServerParams struct { - ServerHost string - LastSyncTime time.Time -} - -func (q *Queries) InsertUniverseServer(ctx context.Context, arg InsertUniverseServerParams) error { - _, err := q.db.ExecContext(ctx, InsertUniverseServer, arg.ServerHost, arg.LastSyncTime) - return err -} - const LogServerSync = `-- name: LogServerSync :exec UPDATE universe_servers SET last_sync_time = $1 @@ -1360,3 +1342,25 @@ func (q *Queries) UpsertUniverseRoot(ctx context.Context, arg UpsertUniverseRoot err := row.Scan(&id) return id, err } + +const UpsertUniverseServer = `-- name: UpsertUniverseServer :exec +INSERT INTO universe_servers( + server_host, last_sync_time +) VALUES ( + $1, $2 + ) +ON CONFLICT(server_host) + DO UPDATE SET last_sync_time = EXCLUDED.last_sync_time +` + +type UpsertUniverseServerParams struct { + ServerHost string + LastSyncTime time.Time +} + +// Upserts a universe server by inserting or updating the last sync time for a +// given server host. +func (q *Queries) UpsertUniverseServer(ctx context.Context, arg UpsertUniverseServerParams) error { + _, err := q.db.ExecContext(ctx, UpsertUniverseServer, arg.ServerHost, arg.LastSyncTime) + return err +} diff --git a/tapdb/universe_federation.go b/tapdb/universe_federation.go index 054a136c9a..b3ee8600ac 100644 --- a/tapdb/universe_federation.go +++ b/tapdb/universe_federation.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "database/sql" - "errors" "fmt" "sort" "sync/atomic" @@ -39,7 +38,7 @@ type ( ProofSyncLogEntry = sqlc.QueryFederationProofSyncLogRow // NewUniverseServer is used to create a new universe server. - NewUniverseServer = sqlc.InsertUniverseServerParams + NewUniverseServer = sqlc.UpsertUniverseServerParams // DelUniverseServer is used to delete a universe server. DelUniverseServer = sqlc.DeleteUniverseServerParams @@ -131,8 +130,8 @@ type UniverseServerStore interface { FederationSyncConfigStore FederationProofSyncLogStore - // InsertUniverseServer inserts a new universe server in to the DB. - InsertUniverseServer(ctx context.Context, arg NewUniverseServer) error + // UpsertUniverseServer upserts a new universe server in to the DB. + UpsertUniverseServer(ctx context.Context, arg NewUniverseServer) error // DeleteUniverseServer removes a universe server from the store. DeleteUniverseServer(ctx context.Context, r DelUniverseServer) error @@ -249,19 +248,14 @@ func (u *UniverseFederationDB) AddServers(ctx context.Context, return fn.ForEachErr(addrs, func(a universe.ServerAddr) error { addr := NewUniverseServer{ ServerHost: a.HostStr(), - LastSyncTime: time.Now(), + LastSyncTime: time.Now().UTC(), } - return db.InsertUniverseServer(ctx, addr) + return db.UpsertUniverseServer(ctx, addr) }) }) if err != nil { - // Add context to unique constraint errors. - var uniqueConstraintErr *ErrSqlUniqueConstraintViolation - if errors.As(err, &uniqueConstraintErr) { - return universe.ErrDuplicateUniverse - } - - return err + return fmt.Errorf("failed to upsert universe server addr: %w", + err) } return nil diff --git a/tapdb/universe_federation_test.go b/tapdb/universe_federation_test.go index ea2751e970..9373eaf34c 100644 --- a/tapdb/universe_federation_test.go +++ b/tapdb/universe_federation_test.go @@ -50,10 +50,10 @@ func TestUniverseFederationCRUD(t *testing.T) { // Next, we'll try to add a new series of servers to the DB. addrs := db.AddRandomServerAddrs(t, 10) - // If we try to insert them all again, then we should get an error as - // we ensure the host names are unique. + // Re-inserting the same addresses should not cause an error, since they + // are expected to be upserted. err = fedDB.AddServers(ctx, addrs...) - require.ErrorIs(t, err, universe.ErrDuplicateUniverse) + require.NoError(t, err) // Next, we should be able to fetch all the active hosts. dbAddrs, err := fedDB.UniverseServers(ctx) diff --git a/universe/federation.go b/universe/federation.go index f17620ee8a..4eaae15e90 100644 --- a/universe/federation.go +++ b/universe/federation.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "time" @@ -20,6 +21,13 @@ const ( DefaultTimeout = 30 * time.Second ) +var ( + // ErrUniConnFailed is returned when we fail to connect to a remote + // universe server. + ErrUniConnFailed = errors.New("connection to remote universe " + + "server failed") +) + // FederationConfig is a config that the FederationEnvoy will use to // synchronize new updates between the current set of federated Universe nodes. type FederationConfig struct { @@ -139,26 +147,15 @@ func (f *FederationEnvoy) Start() error { f.startOnce.Do(func() { log.Infof("Starting FederationEnvoy") + ctx, cancel := f.WithCtxQuit() + defer cancel() + // Before we start the main goroutine, we'll add the set of // static Universe servers. addrs := f.cfg.StaticFederationMembers serverAddrs := fn.Map(addrs, NewServerAddrFromStr) - serverAddrs = fn.Filter(serverAddrs, func(a ServerAddr) bool { - // Before we add the server as a federation member, we - // check that we can actually connect to it and that it - // isn't ourselves. - if err := f.cfg.ServerChecker(a); err != nil { - log.Warnf("Not adding server to federation: %v", - err) - - return false - } - - return true - }) - - err := f.AddServer(serverAddrs...) + _, err := f.AddServer(ctx, true, serverAddrs...) // On restart, we'll get an error for universe servers already // inserted in our DB, since we can't store duplicates. // We can safely ignore that error. @@ -703,20 +700,161 @@ func (f *FederationEnvoy) UpsertProofLeafBatch(_ context.Context, return err } +// ServerAddReport contains the result of attempting to add a server to the +// federation. +type ServerAddReport struct { + // Addr is the address of the server being added. + Addr ServerAddr + + // ConnectionSuccess indicates if the connection to the server was + // successful. If connection checks were disabled, this will be None. + ConnectionSuccess fn.Option[bool] + + // KnownServer is true if the server was already known to the + // federation. + KnownServer bool + + // Error contains any error encountered while adding the server. + Error error +} + // AddServer adds a new set of servers to the federation, then immediately // performs a new background sync. -func (f *FederationEnvoy) AddServer(addrs ...ServerAddr) error { - ctx, cancel := f.WithCtxQuit() - defer cancel() +func (f *FederationEnvoy) AddServer(ctx context.Context, connectionCheck bool, + addrsArg ...ServerAddr) ([]ServerAddReport, error) { + + // Make the addrs set unique. + var uniqueAddrsMap = make(map[string]struct{}) + var reports []ServerAddReport + for idx := range addrsArg { + addr := addrsArg[idx] + addrStr := addr.HostStr() + + if _, exists := uniqueAddrsMap[addrStr]; !exists { + reports = append(reports, ServerAddReport{ + Addr: addr, + }) + } - log.Infof("Adding new Universe server to Federation, addrs=%v", - spew.Sdump(addrs)) + uniqueAddrsMap[addrStr] = struct{}{} + } - if err := f.cfg.FederationDB.AddServers(ctx, addrs...); err != nil { - return err + log.Debugf("Attempting to add %d unique server(s) to federation", + len(reports)) + + var instanceErrors map[int]error + if connectionCheck { + // Before we add the new server to our set of federation + // members, we'll ensure that we can actually connect to + // them. + // + // Define a connection check function that we'll use to + // validate each server in parallel. + connCheck := func(_ context.Context, + addReport ServerAddReport) error { + + err := f.cfg.ServerChecker(addReport.Addr) + if err != nil { + addrStr := addReport.Addr.HostStr() + return fmt.Errorf("%s: %w", addrStr, + ErrUniConnFailed) + } + + return nil + } + + // Run the connection checks in parallel. + var err error + instanceErrors, err = fn.ParSliceErrCollect( + ctx, reports, connCheck, + ) + if err != nil { + return nil, fmt.Errorf("unable to validate server "+ + "connections: %w", err) + } + + // Update reports with connection results. + for idx := range reports { + if errInstance, exists := instanceErrors[idx]; exists { + reports[idx].ConnectionSuccess = fn.Some(false) + reports[idx].Error = errInstance + continue + } + + reports[idx].ConnectionSuccess = fn.Some(true) + } + } + + // Get a full list of existing servers to check for duplicates. + knownServers, err := f.cfg.FederationDB.UniverseServers(ctx) + if err != nil { + return nil, fmt.Errorf("unable to query existing servers: %w", + err) + } + + knownServersSet := fn.NewSet[string]( + fn.Map(knownServers, func(a ServerAddr) string { + return a.HostStr() + })..., + ) + + // Mark known servers in the reports. + for idx := range reports { + addReport := reports[idx] + + reports[idx].KnownServer = + knownServersSet.Contains(addReport.Addr.HostStr()) + } + + // Filter out candidate servers that either failed the connection check + // or are already known. + var filteredAddrs []ServerAddr + for idx := range reports { + report := reports[idx] + + if report.ConnectionSuccess == fn.Some(false) { + log.Debugf("Skipping server add to db, connection "+ + "check failed: %s", report.Addr.HostStr()) + continue + } + + if report.KnownServer { + log.Debugf("Skipping server add to db, already a "+ + "known server: %s", report.Addr.HostStr()) + continue + } + + filteredAddrs = append(filteredAddrs, report.Addr) + } + + if len(filteredAddrs) == 0 { + log.Infof("No new Universe servers to add to Federation") + return reports, nil + } + + log.Infof("Adding new Universe server(s) to Federation, addrs:\n%v", + strings.Join(fn.Map(filteredAddrs, func(a ServerAddr) string { + return "- " + a.HostStr() + }), "\n")) + + // Add the filtered set of universe servers to the federation database. + // + // Note: This operation may still encounter a duplicate universe error + // because the known universe server set is queried in a separate + // database transaction. + err = f.cfg.FederationDB.AddServers(ctx, filteredAddrs...) + if err != nil { + return nil, fmt.Errorf("failed to add servers to federation "+ + "db: %w", err) + } + + err = f.SyncServers(filteredAddrs) + if err != nil { + return nil, fmt.Errorf("unable to sync with new servers: %w", + err) } - return f.SyncServers(addrs) + return reports, nil } // QuerySyncConfigs returns the current sync configs for the federation. diff --git a/universe/federation_test.go b/universe/federation_test.go new file mode 100644 index 0000000000..ca15eace03 --- /dev/null +++ b/universe/federation_test.go @@ -0,0 +1,231 @@ +package universe + +import ( + "context" + "errors" + "testing" + + "github.com/lightninglabs/taproot-assets/fn" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// TestFederationEnvoyAddServer tests the AddServer method of FederationEnvoy. +func TestFederationEnvoyAddServer(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + t.Run("successfully add new servers", func(t *testing.T) { + t.Parallel() + + mockDB := &MockFederationDB{} + mockSyncer := &MockSyncer{} + + // Set up expectations + mockDB.On("UniverseServers", mock.Anything).Return( + []ServerAddr{}, nil) + + expectedAddrs := []ServerAddr{ + NewServerAddrFromStr("server1.example.com"), + NewServerAddrFromStr("server2.example.com"), + } + mockDB.On("AddServers", mock.Anything, + expectedAddrs).Return(nil) + + // Mock the QueryFederationSyncConfigs call used by SyncServers + mockDB.On("QueryFederationSyncConfigs", mock.Anything).Return( + []*FedGlobalSyncConfig(nil), []*FedUniSyncConfig(nil), + nil, + ) + + mockSyncer.On("SyncUniverse", mock.Anything, mock.Anything, + mock.Anything, mock.Anything, mock.Anything).Return( + []AssetSyncDiff(nil), nil) + + successfulServerChecker := func(addr ServerAddr) error { + // Simulate successful connection check. + return nil + } + + envoy := &FederationEnvoy{ + cfg: FederationConfig{ + FederationDB: mockDB, + UniverseSyncer: mockSyncer, + ServerChecker: successfulServerChecker, + }, + ContextGuard: &fn.ContextGuard{ + DefaultTimeout: DefaultTimeout, + Quit: make(chan struct{}), + }, + } + + addrs := []ServerAddr{ + NewServerAddrFromStr("server1.example.com"), + NewServerAddrFromStr("server2.example.com"), + } + + reports, err := envoy.AddServer(ctx, true, addrs...) + require.NoError(t, err) + require.Len(t, reports, 2) + + // Verify the reports. + for _, report := range reports { + require.Equal( + t, fn.Some(true), report.ConnectionSuccess, + ) + require.False(t, report.KnownServer) + require.NoError(t, report.Error) + } + + // Verify all expectations were met + mockDB.AssertExpectations(t) + mockSyncer.AssertExpectations(t) + }) + + t.Run("connection check disabled", func(t *testing.T) { + t.Parallel() + + mockDB := &MockFederationDB{} + mockSyncer := &MockSyncer{} + + // Set up expectations + mockDB.On("UniverseServers", mock.Anything).Return( + []ServerAddr{}, nil) + + expectedAddr := NewServerAddrFromStr("server.example.com") + mockDB.On("AddServers", mock.Anything, + []ServerAddr{expectedAddr}).Return(nil) + + // Mock the QueryFederationSyncConfigs call used by SyncServers + mockDB.On("QueryFederationSyncConfigs", mock.Anything).Return( + []*FedGlobalSyncConfig(nil), []*FedUniSyncConfig(nil), + nil, + ) + + mockSyncer.On("SyncUniverse", mock.Anything, mock.Anything, + mock.Anything, mock.Anything, mock.Anything).Return( + []AssetSyncDiff(nil), nil) + + shouldNotBeCalledServerChecker := func(addr ServerAddr) error { + t.Fatal("ServerChecker should not be called when " + + "connection check is disabled") + return nil + } + + envoy := &FederationEnvoy{ + cfg: FederationConfig{ + FederationDB: mockDB, + UniverseSyncer: mockSyncer, + ServerChecker: shouldNotBeCalledServerChecker, + }, + ContextGuard: &fn.ContextGuard{ + DefaultTimeout: DefaultTimeout, + Quit: make(chan struct{}), + }, + } + + reports, err := envoy.AddServer(ctx, false, expectedAddr) + require.NoError(t, err) + require.Len(t, reports, 1) + + // Verify the report when connection check is disabled. + report := reports[0] + require.Equal(t, fn.None[bool](), report.ConnectionSuccess) + require.False(t, report.KnownServer) + require.NoError(t, report.Error) + + // Verify all expectations were met + mockDB.AssertExpectations(t) + mockSyncer.AssertExpectations(t) + }) + + t.Run("connection check fails", func(t *testing.T) { + t.Parallel() + + mockDB := &MockFederationDB{} + mockSyncer := &MockSyncer{} + + // Set up expectations - only UniverseServers should be called + mockDB.On("UniverseServers", mock.Anything).Return( + []ServerAddr{}, nil) + // AddServers and SyncUniverse should NOT be called + + failingServerChecker := func(addr ServerAddr) error { + return errors.New("connection failed") + } + + envoy := &FederationEnvoy{ + cfg: FederationConfig{ + FederationDB: mockDB, + UniverseSyncer: mockSyncer, + ServerChecker: failingServerChecker, + }, + ContextGuard: &fn.ContextGuard{ + DefaultTimeout: DefaultTimeout, + Quit: make(chan struct{}), + }, + } + + addr := NewServerAddrFromStr("unreachable.example.com") + reports, err := envoy.AddServer(ctx, true, addr) + require.NoError(t, err) + require.Len(t, reports, 1) + + // Verify the report when connection check fails. + report := reports[0] + require.Equal(t, fn.Some(false), report.ConnectionSuccess) + require.False(t, report.KnownServer) + require.Error(t, report.Error) + require.ErrorIs(t, report.Error, ErrUniConnFailed) + + // Verify expectations were met + mockDB.AssertExpectations(t) + mockSyncer.AssertExpectations(t) + }) + + t.Run("skip known servers", func(t *testing.T) { + t.Parallel() + + existingServer := NewServerAddrFromStr("existing.example.com") + + mockDB := &MockFederationDB{} + mockSyncer := &MockSyncer{} + + // Set up expectations - only UniverseServers should be called + mockDB.On("UniverseServers", mock.Anything).Return( + []ServerAddr{existingServer}, nil) + // AddServers and SyncUniverse should NOT be called for known + // servers + + successfulServerChecker := func(addr ServerAddr) error { + return nil + } + + envoy := &FederationEnvoy{ + cfg: FederationConfig{ + FederationDB: mockDB, + UniverseSyncer: mockSyncer, + ServerChecker: successfulServerChecker, + }, + ContextGuard: &fn.ContextGuard{ + DefaultTimeout: DefaultTimeout, + Quit: make(chan struct{}), + }, + } + + reports, err := envoy.AddServer(ctx, true, existingServer) + require.NoError(t, err) + require.Len(t, reports, 1) + + // Verify the report for known server. + report := reports[0] + require.Equal(t, fn.Some(true), report.ConnectionSuccess) + require.True(t, report.KnownServer) + require.NoError(t, report.Error) + + // Verify expectations were met + mockDB.AssertExpectations(t) + mockSyncer.AssertExpectations(t) + }) +} diff --git a/universe/mock.go b/universe/mock.go new file mode 100644 index 0000000000..d2249ba9f0 --- /dev/null +++ b/universe/mock.go @@ -0,0 +1,107 @@ +package universe + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +// MockFederationDB is a mock implementation of the FederationDB interface +// for testing. +type MockFederationDB struct { + mock.Mock +} + +func (m *MockFederationDB) UniverseServers(ctx context.Context) ( + []ServerAddr, error) { + + args := m.Called(ctx) + return args.Get(0).([]ServerAddr), args.Error(1) +} + +func (m *MockFederationDB) AddServers(ctx context.Context, + addrs ...ServerAddr) error { + + args := m.Called(ctx, addrs) + return args.Error(0) +} + +func (m *MockFederationDB) RemoveServers(ctx context.Context, + addrs ...ServerAddr) error { + + args := m.Called(ctx, addrs) + return args.Error(0) +} + +func (m *MockFederationDB) LogNewSyncs(ctx context.Context, + addrs ...ServerAddr) error { + + args := m.Called(ctx, addrs) + return args.Error(0) +} + +func (m *MockFederationDB) QueryFederationSyncConfigs( + ctx context.Context) ([]*FedGlobalSyncConfig, []*FedUniSyncConfig, + error) { + + args := m.Called(ctx) + return args.Get(0).([]*FedGlobalSyncConfig), + args.Get(1).([]*FedUniSyncConfig), args.Error(2) +} + +func (m *MockFederationDB) UpsertFederationSyncConfig( + ctx context.Context, globalSyncConfigs []*FedGlobalSyncConfig, + uniSyncConfigs []*FedUniSyncConfig) error { + + args := m.Called(ctx, globalSyncConfigs, uniSyncConfigs) + return args.Error(0) +} + +func (m *MockFederationDB) UpsertFederationProofSyncLog( + ctx context.Context, uniID Identifier, leafKey LeafKey, + addr ServerAddr, syncDirection SyncDirection, + syncStatus ProofSyncStatus, + bumpSyncAttemptCounter bool) (int64, error) { + + args := m.Called(ctx, uniID, leafKey, addr, syncDirection, + syncStatus, bumpSyncAttemptCounter) + return args.Get(0).(int64), args.Error(1) +} + +func (m *MockFederationDB) QueryFederationProofSyncLog( + ctx context.Context, uniID Identifier, leafKey LeafKey, + syncDirection SyncDirection, + syncStatus ProofSyncStatus) ([]*ProofSyncLogEntry, error) { + + args := m.Called(ctx, uniID, leafKey, syncDirection, syncStatus) + return args.Get(0).([]*ProofSyncLogEntry), args.Error(1) +} + +func (m *MockFederationDB) FetchPendingProofsSyncLog( + ctx context.Context, + syncDirection *SyncDirection) ([]*ProofSyncLogEntry, error) { + + args := m.Called(ctx, syncDirection) + return args.Get(0).([]*ProofSyncLogEntry), args.Error(1) +} + +func (m *MockFederationDB) DeleteProofsSyncLogEntries( + ctx context.Context, servers ...ServerAddr) error { + + args := m.Called(ctx, servers) + return args.Error(0) +} + +// MockSyncer is a mock implementation of the Syncer interface for testing. +type MockSyncer struct { + mock.Mock +} + +// SyncUniverse implements the Syncer interface. +func (m *MockSyncer) SyncUniverse(ctx context.Context, host ServerAddr, + syncType SyncType, syncConfigs SyncConfigs, + idsToSync ...Identifier) ([]AssetSyncDiff, error) { + + args := m.Called(ctx, host, syncType, syncConfigs, idsToSync) + return args.Get(0).([]AssetSyncDiff), args.Error(1) +}