From e48f9a4d63e622b4ef8a3c674f30321ea47ac9c4 Mon Sep 17 00:00:00 2001 From: warpcomdev Date: Mon, 13 Sep 2021 12:24:06 +0200 Subject: [PATCH] *: Add ability to disable sync replication when there're not enough ready standbys --- cmd/sentinel/cmd/sentinel.go | 2 +- cmd/sentinel/cmd/sentinel_test.go | 752 ++++++++++++++++++++++++++++++ doc/cluster_spec.md | 2 +- doc/syncrepl.md | 4 + internal/cluster/cluster.go | 3 - tests/integration/ha_test.go | 186 +++++--- tests/integration/utils.go | 32 +- 7 files changed, 907 insertions(+), 74 deletions(-) diff --git a/cmd/sentinel/cmd/sentinel.go b/cmd/sentinel/cmd/sentinel.go index 96a44c59a..512172e6d 100644 --- a/cmd/sentinel/cmd/sentinel.go +++ b/cmd/sentinel/cmd/sentinel.go @@ -1078,7 +1078,7 @@ func (s *Sentinel) updateCluster(cd *cluster.ClusterData, pis cluster.ProxiesInf newMasterDB.Spec.SynchronousStandbys = append(newMasterDB.Spec.SynchronousStandbys, oldMasterdb.UID) } } - if len(newMasterDB.Spec.SynchronousStandbys) == 0 { + if len(newMasterDB.Spec.SynchronousStandbys) == 0 && *clusterSpec.MinSynchronousStandbys > 0 { newMasterDB.Spec.ExternalSynchronousStandbys = []string{fakeStandbyName} } diff --git a/cmd/sentinel/cmd/sentinel_test.go b/cmd/sentinel/cmd/sentinel_test.go index 67851ce24..16a988cd4 100644 --- a/cmd/sentinel/cmd/sentinel_test.go +++ b/cmd/sentinel/cmd/sentinel_test.go @@ -5349,6 +5349,758 @@ func TestUpdateCluster(t *testing.T) { }, }, }, + // #28 One master and one standby. Synchronous replication enabled right + // now in the cluster spec, with MinSynchronousStandbys=0. + // master db not healthy: standby elected as new master since + // dbSpec.SynchronousReplication is false yet. The new master will have + // SynchronousReplication true and NO fake sync standby. + { + cd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + SynchronousReplication: cluster.BoolP(true), + MinSynchronousStandbys: cluster.Uint16P(0), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db1", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: false, + Role: common.RoleMaster, + Followers: []string{"db2"}, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: false, + CurrentGeneration: 1, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: false, + Role: common.RoleStandby, + Followers: []string{}, + FollowConfig: &cluster.FollowConfig{ + Type: cluster.FollowTypeInternal, + DBUID: "db1", + }, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 1, + Spec: cluster.ProxySpec{ + MasterDBUID: "db1", + EnabledProxies: []string{}, + }, + }, + }, + outcd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + SynchronousReplication: cluster.BoolP(true), + MinSynchronousStandbys: cluster.Uint16P(0), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db2", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: false, + Role: common.RoleMaster, + Followers: []string{}, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: false, + CurrentGeneration: 1, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: true, + Role: common.RoleMaster, + Followers: []string{}, + SynchronousStandbys: []string{}, + ExternalSynchronousStandbys: []string{}, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 2, + Spec: cluster.ProxySpec{ + MasterDBUID: "", + EnabledProxies: []string{}, + }, + }, + }, + }, + // #29 One master and one standby. Synchronous replication already + // enabled, with MinSynchronousStandbys=0. + // master db not healthy: standby elected as new master since it's in + // the SynchronousStandbys. No fake replica is added. + { + cd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + SynchronousReplication: cluster.BoolP(true), + MinSynchronousStandbys: cluster.Uint16P(0), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db1", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: true, + Role: common.RoleMaster, + Followers: []string{"db2"}, + SynchronousStandbys: []string{"db2"}, + ExternalSynchronousStandbys: []string{}, + }, + Status: cluster.DBStatus{ + Healthy: false, + CurrentGeneration: 1, + SynchronousStandbys: []string{"db2"}, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: false, + Role: common.RoleStandby, + Followers: []string{}, + FollowConfig: &cluster.FollowConfig{ + Type: cluster.FollowTypeInternal, + DBUID: "db1", + }, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 1, + Spec: cluster.ProxySpec{ + MasterDBUID: "db1", + EnabledProxies: []string{}, + }, + }, + }, + outcd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + SynchronousReplication: cluster.BoolP(true), + MinSynchronousStandbys: cluster.Uint16P(0), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db2", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: true, + Role: common.RoleMaster, + Followers: []string{}, + SynchronousStandbys: []string{"db2"}, + ExternalSynchronousStandbys: []string{}, + }, + Status: cluster.DBStatus{ + Healthy: false, + CurrentGeneration: 1, + SynchronousStandbys: []string{"db2"}, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: true, + Role: common.RoleMaster, + Followers: []string{}, + SynchronousStandbys: []string{"db1"}, + ExternalSynchronousStandbys: []string{}, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 2, + Spec: cluster.ProxySpec{ + MasterDBUID: "", + EnabledProxies: []string{}, + }, + }, + }, + }, + // #30 One master and one standby. Synchronous replication already + // enabled, with MinSynchronousStandbys=0. + // standby db not healthy: standby removed from synchronousStandbys even though + // there's not better standby to choose + { + cd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + SynchronousReplication: cluster.BoolP(true), + MinSynchronousStandbys: cluster.Uint16P(0), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db1", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: true, + Role: common.RoleMaster, + Followers: []string{"db2"}, + SynchronousStandbys: []string{"db2"}, + ExternalSynchronousStandbys: []string{}, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + SynchronousStandbys: []string{"db2"}, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: false, + Role: common.RoleStandby, + Followers: []string{}, + FollowConfig: &cluster.FollowConfig{ + Type: cluster.FollowTypeInternal, + DBUID: "db1", + }, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: false, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 1, + Spec: cluster.ProxySpec{ + MasterDBUID: "db1", + EnabledProxies: []string{}, + }, + }, + }, + outcd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + SynchronousReplication: cluster.BoolP(true), + MinSynchronousStandbys: cluster.Uint16P(0), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db1", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: true, + Role: common.RoleMaster, + Followers: []string{"db2"}, + SynchronousStandbys: []string{}, + ExternalSynchronousStandbys: []string{}, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + SynchronousStandbys: []string{}, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + SynchronousReplication: false, + Role: common.RoleStandby, + Followers: []string{}, + FollowConfig: &cluster.FollowConfig{ + Type: cluster.FollowTypeInternal, + DBUID: "db1", + }, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: false, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 1, + Spec: cluster.ProxySpec{ + MasterDBUID: "db1", + EnabledProxies: []string{}, + }, + }, + }, + }, + // #31 One master (unhealthy) and an async standby. Synchronous replication already enabled + // enabled, with MinSynchronousStandbys=0. + // master (db1) and async (db2) with --never-synchronous-replica. + // StrictSyncRepl is set to false. Db2 is never elected as new sync, and fake replica + // is removed from db1 + { + cd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + SynchronousReplication: cluster.BoolP(true), + MinSynchronousStandbys: cluster.Uint16P(0), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db1", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + CanBeSynchronousReplica: cluster.BoolP(false), + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: true, + Role: common.RoleMaster, + Followers: []string{"db2"}, + SynchronousStandbys: []string{}, + ExternalSynchronousStandbys: []string{"stolonfakestandby"}, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + SynchronousStandbys: []string{}, + CurSynchronousStandbys: []string{}, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleStandby, + Followers: []string{}, + FollowConfig: &cluster.FollowConfig{ + Type: cluster.FollowTypeInternal, + DBUID: "db1", + }, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 1, + Spec: cluster.ProxySpec{ + MasterDBUID: "db1", + EnabledProxies: []string{}, + }, + }, + }, + outcd: &cluster.ClusterData{ + Cluster: &cluster.Cluster{ + UID: "cluster1", + Generation: 1, + Spec: &cluster.ClusterSpec{ + ConvergenceTimeout: &cluster.Duration{Duration: cluster.DefaultConvergenceTimeout}, + InitTimeout: &cluster.Duration{Duration: cluster.DefaultInitTimeout}, + SyncTimeout: &cluster.Duration{Duration: cluster.DefaultSyncTimeout}, + MaxStandbysPerSender: cluster.Uint16P(cluster.DefaultMaxStandbysPerSender), + SynchronousReplication: cluster.BoolP(true), + MinSynchronousStandbys: cluster.Uint16P(0), + }, + Status: cluster.ClusterStatus{ + CurrentGeneration: 1, + Phase: cluster.ClusterPhaseNormal, + Master: "db1", + }, + }, + Keepers: cluster.Keepers{ + "keeper1": &cluster.Keeper{ + UID: "keeper1", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + }, + }, + "keeper2": &cluster.Keeper{ + UID: "keeper2", + Spec: &cluster.KeeperSpec{}, + Status: cluster.KeeperStatus{ + Healthy: true, + LastHealthyTime: now, + CanBeSynchronousReplica: cluster.BoolP(false), + }, + }, + }, + DBs: cluster.DBs{ + "db1": &cluster.DB{ + UID: "db1", + Generation: 2, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper1", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: true, + Role: common.RoleMaster, + Followers: []string{"db2"}, + SynchronousStandbys: []string{}, + ExternalSynchronousStandbys: []string{}, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + SynchronousStandbys: []string{}, + CurSynchronousStandbys: []string{}, + }, + }, + "db2": &cluster.DB{ + UID: "db2", + Generation: 1, + ChangeTime: time.Time{}, + Spec: &cluster.DBSpec{ + KeeperUID: "keeper2", + RequestTimeout: cluster.Duration{Duration: cluster.DefaultRequestTimeout}, + MaxStandbys: cluster.DefaultMaxStandbys, + AdditionalWalSenders: cluster.DefaultAdditionalWalSenders, + InitMode: cluster.DBInitModeNone, + SynchronousReplication: false, + Role: common.RoleStandby, + Followers: []string{}, + FollowConfig: &cluster.FollowConfig{ + Type: cluster.FollowTypeInternal, + DBUID: "db1", + }, + SynchronousStandbys: nil, + ExternalSynchronousStandbys: nil, + }, + Status: cluster.DBStatus{ + Healthy: true, + CurrentGeneration: 1, + }, + }, + }, + Proxy: &cluster.Proxy{ + Generation: 1, + Spec: cluster.ProxySpec{ + MasterDBUID: "db1", + EnabledProxies: []string{}, + }, + }, + }, + }, } for i, tt := range tests { diff --git a/doc/cluster_spec.md b/doc/cluster_spec.md index 92eaff96a..5415f6207 100644 --- a/doc/cluster_spec.md +++ b/doc/cluster_spec.md @@ -24,7 +24,7 @@ Some options in a running cluster specification can be changed to update the des | maxStandbysPerSender | max number of standbys for every sender. A sender can be a master or another standby (with cascading replication). | no | uint16 | 3 | | maxStandbyLag | maximum lag (from the last reported master state, in bytes) that an asynchronous standby can have to be elected in place of a failed master. | no | uint32 | 1MiB | | synchronousReplication | use synchronous replication between the master and its standbys | no | bool | false | -| minSynchronousStandbys | minimum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | uint16 | 1 | +| minSynchronousStandbys | minimum number of required synchronous standbys when synchronous replication is enabled (only set this to a value != 1 when using PostgreSQL >= 9.6). Notice that you are also allowed to set it to 0, in which case the primary will keep accepting commits even when there is no healthy sync standby. Setting `minSynchronousStandbys` to 0 is not generally advised - please see [syncrepl.md](syncrepl.md) | no | uint16 | 1 | | maxSynchronousStandbys | maximum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | uint16 | 1 | | additionalWalSenders | number of additional wal_senders in addition to the ones internally defined by stolon, useful to provide enough wal senders for external standbys (changing this value requires an instance restart) | no | uint16 | 5 | | additionalMasterReplicationSlots | a list of additional physical replication slots to be created on the master postgres instance. They will be prefixed with `stolon_` (like internal replication slots used for standby replication) to make them "namespaced" from other replication slots. Replication slots starting with `stolon_` and not defined here (and not used for standby replication) will be dropped from the master instance. | no | []string | null | diff --git a/doc/syncrepl.md b/doc/syncrepl.md index a96b976a5..c3fc5cc0b 100644 --- a/doc/syncrepl.md +++ b/doc/syncrepl.md @@ -9,6 +9,10 @@ You can enable/disable synchronous replication at any time and the keepers will In the cluster spec you can set the `MinSynchronousStandbys` and `MaxSynchronousStandbys` values (they both defaults to 1). Having multiple synchronous standbys is a feature provided starting from [PostgreSQL 9.6](https://www.postgresql.org/docs/9.6/static/warm-standby.html#SYNCHRONOUS-REPLICATION). Values different than 1 for postgres versions below 9.6 will be ignored. +For postgres versions over 9.6, you are also allowed to set `MinSynchronousStandbys` to 0. When set to 0, stolon will keep healthy sync standbys in the `synchronous_standby_names` list (up to `MaxSynchronousStandbys`), but will not force the primary to block if there isn't any healthy sync standby to add to the list. + +Notice that `MinSynchronousStandbys` = 0 can cause loss of committed transactions if the primary restarts while the list of synchronous standbys is empty. So this is only advised for clusters with just one primary and one sync standby, when you value availability over consistency. For availability _and_ consistency, you should set `MinSynchronousStandbys` > 0, and have at least `MinSynchronousStandbys` + 1 standbys. + ## Enable synchronous replication. Assuming that your cluster name is `mycluster` and using etcd (v3 api) listening on localhost:2379: diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index 9578c82bc..194d6635a 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -456,9 +456,6 @@ func (os *ClusterSpec) Validate() error { if *s.MaxStandbysPerSender < 1 { return fmt.Errorf("maxStandbysPerSender must be at least 1") } - if *s.MinSynchronousStandbys < 1 { - return fmt.Errorf("minSynchronousStandbys must be at least 1") - } if *s.MaxSynchronousStandbys < 1 { return fmt.Errorf("maxSynchronousStandbys must be at least 1") } diff --git a/tests/integration/ha_test.go b/tests/integration/ha_test.go index a32f51d4f..c8f72ae47 100644 --- a/tests/integration/ha_test.go +++ b/tests/integration/ha_test.go @@ -123,7 +123,21 @@ func TestInitWithMultipleKeepers(t *testing.T) { waitKeeperReady(t, sm, tks[masterUID]) } -func setupServers(t *testing.T, clusterName, dir string, numKeepers, numSentinels uint8, syncRepl bool, usePgrewind bool, primaryKeeper *TestKeeper) (testKeepers, testSentinels, *TestProxy, *TestStore) { +// optionSetter implements the "functional options" pattern for ClusterSpec. +// Useful to avoid modifying all the tests that depend on 'setupServers' signature, +// whenever a new options is needed. +type optionSetter func(*cluster.ClusterSpec) + +// WithMinSync0 == true sets the MinimumSynchronousStandbys to 0 in ClusterSpec +func withMinSync0(minSync0 bool) optionSetter { + return func(s *cluster.ClusterSpec) { + if minSync0 { + s.MinSynchronousStandbys = cluster.Uint16P(0) + } + } +} + +func setupServers(t *testing.T, clusterName, dir string, numKeepers, numSentinels uint8, syncRepl bool, usePgrewind bool, primaryKeeper *TestKeeper, otherOptions ...optionSetter) (testKeepers, testSentinels, *TestProxy, *TestStore) { var initialClusterSpec *cluster.ClusterSpec if primaryKeeper == nil { initialClusterSpec = &cluster.ClusterSpec{ @@ -169,6 +183,9 @@ func setupServers(t *testing.T, clusterName, dir string, numKeepers, numSentinel } } + for _, setter := range otherOptions { + setter(initialClusterSpec) + } return setupServersCustom(t, clusterName, dir, numKeepers, numSentinels, initialClusterSpec) } @@ -467,7 +484,7 @@ func TestFailoverStandbyCluster(t *testing.T) { func TestFailoverSyncReplStandbyCluster(t *testing.T) { t.Parallel() - testFailover(t, false, true) + testFailover(t, true, true) } // Tests standby elected as new master but fails to become master. Then old @@ -572,6 +589,11 @@ func TestFailoverFailedSyncRepl(t *testing.T) { } func TestFailoverFailedStandbyCluster(t *testing.T) { + t.Parallel() + testFailoverFailed(t, false, true) +} + +func TestFailoverFailedSyncStandbyCluster(t *testing.T) { t.Parallel() testFailoverFailed(t, true, true) } @@ -658,7 +680,7 @@ func TestFailoverTooMuchLagStandbyCluster(t *testing.T) { testFailoverTooMuchLag(t, true) } -func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool, standbyCluster bool) { +func testOldMasterRestart(t *testing.T, syncRepl, minSync0 bool, usePgrewind bool, standbyCluster bool) { dir, err := ioutil.TempDir("", "stolon") if err != nil { t.Fatalf("unexpected err: %v", err) @@ -679,7 +701,10 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool, standbyClust clusterName := uuid.NewV4().String() - tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, syncRepl, usePgrewind, ptk) + specOptions := []optionSetter{ + withMinSync0(minSync0), + } + tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, syncRepl, usePgrewind, ptk, specOptions...) defer shutdown(tks, tss, tp, tstore) storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) @@ -731,24 +756,26 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool, standbyClust t.Fatalf("wrong number of lines, want: %d, got: %d", 1, c) } - // Add another standby so we'll have 2 standbys. With only 1 standby, - // when using synchronous replication, the test will block forever when + // If minSync0 is false, add another standby so we'll have 2 standbys. + // With only 1 standby, when using synchronous replication, the test will block forever when // writing to the new master since there's not active synchronous // standby. - tk, err := NewTestKeeper(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, tstore.storeBackend, storeEndpoints) - if err != nil { - t.Fatalf("unexpected err: %v", err) - } - tks[tk.uid] = tk + if !minSync0 { + tk, err := NewTestKeeper(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, tstore.storeBackend, storeEndpoints) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tks[tk.uid] = tk - if err := tk.Start(); err != nil { - t.Fatalf("unexpected err: %v", err) - } - standbys = append(standbys, tk) + if err := tk.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + standbys = append(standbys, tk) - // Wait replicated data to standby - if err := waitLines(t, standbys[1], 1, 30*time.Second); err != nil { - t.Fatalf("unexpected err: %v", err) + // Wait replicated data to standby + if err := waitLines(t, standbys[1], 1, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } } if !standbyCluster { @@ -774,30 +801,40 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool, standbyClust func TestOldMasterRestart(t *testing.T) { t.Parallel() - testOldMasterRestart(t, false, false, false) + testOldMasterRestart(t, false, false, false, false) } func TestOldMasterRestartSyncRepl(t *testing.T) { t.Parallel() - testOldMasterRestart(t, true, false, false) + testOldMasterRestart(t, true, false, false, false) +} + +func TestOldMasterRestartSyncRepl0(t *testing.T) { + t.Parallel() + testOldMasterRestart(t, true, true, false, false) } func TestOldMasterRestartPgrewind(t *testing.T) { t.Parallel() - testOldMasterRestart(t, false, true, false) + testOldMasterRestart(t, false, false, true, false) } func TestOldMasterRestartSyncReplPgrewind(t *testing.T) { t.Parallel() - testOldMasterRestart(t, true, true, false) + testOldMasterRestart(t, true, false, true, false) +} + +func TestOldMasterRestartSyncRepl0Pgrewind(t *testing.T) { + t.Parallel() + testOldMasterRestart(t, true, true, true, false) } func TestOldMasterRestartStandbyCluster(t *testing.T) { t.Parallel() - testOldMasterRestart(t, false, false, true) + testOldMasterRestart(t, false, false, false, true) } -func testPartition1(t *testing.T, syncRepl, usePgrewind bool, standbyCluster bool) { +func testPartition1(t *testing.T, syncRepl, minSync0, usePgrewind bool, standbyCluster bool) { dir, err := ioutil.TempDir("", "stolon") if err != nil { t.Fatalf("unexpected err: %v", err) @@ -818,7 +855,10 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool, standbyCluster boo clusterName := uuid.NewV4().String() - tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, syncRepl, usePgrewind, ptk) + specOptions := []optionSetter{ + withMinSync0(minSync0), + } + tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, syncRepl, usePgrewind, ptk, specOptions...) defer shutdown(tks, tss, tp, tstore) storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port) @@ -875,24 +915,26 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool, standbyCluster boo t.Fatalf("wrong number of lines, want: %d, got: %d", 1, c) } - // Add another standby so we'll have 2 standbys. With only 1 standby, - // when using synchronous replication, the test will block forever when - // writing to the new master since there's not active synchronous - // standby. - tk, err := NewTestKeeper(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, tstore.storeBackend, storeEndpoints) - if err != nil { - t.Fatalf("unexpected err: %v", err) - } - tks[tk.uid] = tk + // If minSync0 is false, add another standby so we'll have 2 + // standbys. With only 1 standby, when using synchronous replication, + // the test will block forever when writing to the new master since + // there's not active synchronous standby. + if !minSync0 { + tk, err := NewTestKeeper(t, dir, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, tstore.storeBackend, storeEndpoints) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tks[tk.uid] = tk - if err := tk.Start(); err != nil { - t.Fatalf("unexpected err: %v", err) - } - standbys = append(standbys, tk) + if err := tk.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + standbys = append(standbys, tk) - // Wait replicated data to standby - if err := waitLines(t, standbys[1], 1, 30*time.Second); err != nil { - t.Fatalf("unexpected err: %v", err) + // Wait replicated data to standby + if err := waitLines(t, standbys[1], 1, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } } // the proxy should connect to the right master @@ -928,27 +970,37 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool, standbyCluster boo func TestPartition1(t *testing.T) { t.Parallel() - testPartition1(t, false, false, false) + testPartition1(t, false, false, false, false) } func TestPartition1SyncRepl(t *testing.T) { t.Parallel() - testPartition1(t, true, false, false) + testPartition1(t, true, false, false, false) +} + +func TestPartition1SyncRepl0(t *testing.T) { + t.Parallel() + testPartition1(t, true, true, false, false) } func TestPartition1Pgrewind(t *testing.T) { t.Parallel() - testPartition1(t, false, true, false) + testPartition1(t, false, false, true, false) } func TestPartition1SyncReplPgrewind(t *testing.T) { t.Parallel() - testPartition1(t, true, true, false) + testPartition1(t, true, false, true, false) +} + +func TestPartition1SyncRepl0Pgrewind(t *testing.T) { + t.Parallel() + testPartition1(t, true, true, true, false) } func TestPartition1StandbyCluster(t *testing.T) { t.Parallel() - testPartition1(t, false, false, true) + testPartition1(t, false, false, false, true) } func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) { @@ -1952,13 +2004,13 @@ func TestForceFailStandbyCluster(t *testing.T) { func TestForceFailSyncReplStandbyCluster(t *testing.T) { t.Parallel() - testForceFail(t, false, true) + testForceFail(t, true, true) } // TestSyncStandbyNotInSync tests that, when using synchronous replication, a // normal user cannot connect to primary db after it has restarted until all // defined synchronous standbys are in sync. -func TestSyncStandbyNotInSync(t *testing.T) { +func testSyncStandbyNotInSync(t *testing.T, minSync0 bool) { t.Parallel() dir, err := ioutil.TempDir("", "stolon") if err != nil { @@ -1966,7 +2018,10 @@ func TestSyncStandbyNotInSync(t *testing.T) { } defer os.RemoveAll(dir) clusterName := uuid.NewV4().String() - tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, true, false, nil) + clusterOpts := []optionSetter{ + withMinSync0(minSync0), + } + tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, true, false, nil, clusterOpts...) defer shutdown(tks, tss, tp, tstore) storePath := filepath.Join(common.StorePrefix, clusterName) sm := store.NewKVBackedStore(tstore.store, storePath) @@ -2020,7 +2075,7 @@ func TestSyncStandbyNotInSync(t *testing.T) { if err := tp.WaitRightMaster(master, 3*cluster.DefaultProxyCheckInterval); err != nil { t.Fatalf("unexpected err: %v", err) } - // Stop the standby keeper, should also stop the database + // Stop the standby keeper, should also stop the database if minSync0 == false t.Logf("Stopping current standby keeper: %s", standby.uid) standby.Stop() // this call will block and then exit with an error when the master is restarted @@ -2044,14 +2099,25 @@ func TestSyncStandbyNotInSync(t *testing.T) { if c != 2 { t.Fatalf("wrong number of lines, want: %d, got: %d", 2, c) } - // The normal user shouldn't be able to connect - if _, err := user01db.Exec("SELECT * from table01"); err != nil { - exp := `pq: no pg_hba.conf entry for host "127.0.0.1", user "user01", database "postgres"` - if !strings.HasPrefix(err.Error(), exp) { - t.Fatalf("expected error when connecting to db as user01 starting with %q, got err: %q", exp, err.Error()) + if !minSync0 { + // The normal user shouldn't be able to connect + if _, err := user01db.Exec("SELECT * from table01"); err != nil { + exp := `pq: no pg_hba.conf entry for host "127.0.0.1", user "user01", database "postgres"` + if !strings.HasPrefix(err.Error(), exp) { + t.Fatalf("expected error when connecting to db as user01 starting with %q, got err: %q", exp, err.Error()) + } + } else { + t.Fatalf("expected error connecting to db as user01, got no err") } } else { - t.Fatalf("expected error connecting to db as user01, got no err") + // Must wait until the standby failure is detected + // and the master is restarted without sync replicas. + if err := master.WaitPGParameter("synchronous_standby_names", "", 30*time.Second); err != nil { + t.Fatalf("unexpected error waiting for synchronous standbys to be removed, got err %q", err.Error()) + } + if _, err := user01db.Exec("SELECT * from table01"); err != nil { + t.Fatalf("unexpected error connecting to db as user01, got err %q", err.Error()) + } } // Starting the standby keeper t.Logf("Starting current standby keeper: %s", standby.uid) @@ -2069,3 +2135,11 @@ func TestSyncStandbyNotInSync(t *testing.T) { t.Fatalf("wrong number of lines, want: %d, got: %d", 2, c) } } + +func TestSyncStandbyNotInSync(t *testing.T) { + testSyncStandbyNotInSync(t, false) +} + +func TestSyncStandbyNotInSync0(t *testing.T) { + testSyncStandbyNotInSync(t, true) +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 58d446cc8..9ac9abf0d 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -655,6 +655,24 @@ func (tk *TestKeeper) WaitDBRole(r common.Role, ptk *TestKeeper, timeout time.Du return fmt.Errorf("timeout") } +func (tk *TestKeeper) WaitPGParameter(parameter, value string, timeout time.Duration) error { + latestValue := "" + start := time.Now() + for time.Now().Add(-timeout).Before(start) { + pgParameters, err := GetPGParameters(tk) + if err != nil { + goto end + } + latestValue = pgParameters[parameter] + if latestValue == value { + return nil + } + end: + time.Sleep(sleepInterval) + } + return fmt.Errorf("timeout waiting for pgParamater %q (%q) to equal %q", parameter, latestValue, value) +} + func (tk *TestKeeper) GetPGParameters() (common.Parameters, error) { return GetPGParameters(tk) } @@ -862,19 +880,7 @@ func (tp *TestProxy) GetPGParameters() (common.Parameters, error) { } func (tp *TestProxy) WaitRightMaster(tk *TestKeeper, timeout time.Duration) error { - start := time.Now() - for time.Now().Add(-timeout).Before(start) { - pgParameters, err := GetPGParameters(tp) - if err != nil { - goto end - } - if pgParameters["port"] == tk.pgPort { - return nil - } - end: - time.Sleep(sleepInterval) - } - return fmt.Errorf("timeout") + return tk.WaitPGParameter("port", tk.pgPort, timeout) } func StolonCtl(t *testing.T, clusterName string, storeBackend store.Backend, storeEndpoints string, a ...string) error {