Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Have keeper notify when shutting down #845

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 51 additions & 26 deletions cmd/keeper/cmd/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (p *PostgresKeeper) usePgrewind(db *cluster.DB) bool {
return p.pgSUUsername != "" && p.pgSUPassword != "" && db.Spec.UsePgrewind
}

func (p *PostgresKeeper) updateKeeperInfo() error {
func (p *PostgresKeeper) updateKeeperInfo(shuttingDown bool) error {
p.localStateMutex.Lock()
keeperUID := p.keeperLocalState.UID
clusterUID := p.keeperLocalState.ClusterUID
Expand All @@ -601,6 +601,13 @@ func (p *PostgresKeeper) updateKeeperInfo() error {
log.Warnf("failed to get postgres binary version: %v", err)
}

lastPGState := p.getLastPGState()
if shuttingDown {
// Tell the sentinels we are shutting this down.
// We don't need the mutex since p.getLastPGState()
// does a deep copy.
lastPGState.Healthy = false
}
keeperInfo := &cluster.KeeperInfo{
InfoUID: common.UID(),
UID: keeperUID,
Expand All @@ -610,7 +617,7 @@ func (p *PostgresKeeper) updateKeeperInfo() error {
Maj: maj,
Min: min,
},
PostgresState: p.getLastPGState(),
PostgresState: lastPGState,

CanBeMaster: p.canBeMaster,
CanBeSynchronousReplica: p.canBeSynchronousReplica,
Expand Down Expand Up @@ -801,9 +808,9 @@ func (p *PostgresKeeper) getLastPGState() *cluster.PostgresState {
}

func (p *PostgresKeeper) Start(ctx context.Context) {
endSMCh := make(chan struct{})
endPgStatecheckerCh := make(chan struct{})
endUpdateKeeperInfo := make(chan struct{})
endPostgresKeeperSM := make(chan struct{}, 1)
endUpdatePGState := make(chan struct{}, 1)
endUpdateKeeperInfo := make(chan struct{}, 1)

var err error
var cd *cluster.ClusterData
Expand All @@ -828,9 +835,29 @@ func (p *PostgresKeeper) Start(ctx context.Context) {

_ = p.pgm.StopIfStarted(true)

smTimerCh := time.NewTimer(0).C
updatePGStateTimerCh := time.NewTimer(0).C
updateKeeperInfoTimerCh := time.NewTimer(0).C
postgresKeeperSMTimer := time.NewTimer(0)
updatePGStateTimer := time.NewTimer(0)
updateKeeperInfoTimer := time.NewTimer(0)

// Make sure there is a single goroutine sending KeeperInfo updates.
// We want to force an order in calls to updateKeeperInfo, so the
// KeeperInfo sent when the context is cancelled is guaranteed to be
// the last one.
doUpdateKeeperInfo := make(chan struct{}, 1)
go func() {
defer close(endUpdateKeeperInfo)
for range doUpdateKeeperInfo {
if err = p.updateKeeperInfo(false); err != nil {
log.Errorw("failed to update keeper info", zap.Error(err))
}
endUpdateKeeperInfo <- struct{}{}
}
// Once the channel is closed, send a dying gasp
if err = p.updateKeeperInfo(true); err != nil {
log.Errorw("failed to update keeper info", zap.Error(err))
}
}()

for {
// The sleepInterval can be updated during normal execution. Ensure we regularly
// refresh the metric to account for those changes.
Expand All @@ -842,39 +869,37 @@ func (p *PostgresKeeper) Start(ctx context.Context) {
if err = p.pgm.StopIfStarted(true); err != nil {
log.Errorw("failed to stop pg instance", zap.Error(err))
}
close(doUpdateKeeperInfo) // This will notify sentinels we are shutting down
for range endUpdateKeeperInfo { // Wait until the dying gasp is sent
}
p.end <- nil
return

case <-smTimerCh:
case <-postgresKeeperSMTimer.C:
go func() {
p.postgresKeeperSM(ctx)
endSMCh <- struct{}{}
endPostgresKeeperSM <- struct{}{}
}()

case <-endSMCh:
smTimerCh = time.NewTimer(p.sleepInterval).C
case <-endPostgresKeeperSM:
postgresKeeperSMTimer.Reset(p.sleepInterval)

case <-updatePGStateTimerCh:
// updateKeeperInfo two times faster than the sleep interval
case <-updatePGStateTimer.C:
// updatePGState two times faster than the sleep interval
go func() {
p.updatePGState(ctx)
endPgStatecheckerCh <- struct{}{}
endUpdatePGState <- struct{}{}
}()

case <-endPgStatecheckerCh:
// updateKeeperInfo two times faster than the sleep interval
updatePGStateTimerCh = time.NewTimer(p.sleepInterval / 2).C
case <-endUpdatePGState:
// updatePGState two times faster than the sleep interval
updatePGStateTimer.Reset(p.sleepInterval / 2)

case <-updateKeeperInfoTimerCh:
go func() {
if err := p.updateKeeperInfo(); err != nil {
log.Errorw("failed to update keeper info", zap.Error(err))
}
endUpdateKeeperInfo <- struct{}{}
}()
case <-updateKeeperInfoTimer.C:
doUpdateKeeperInfo <- struct{}{}

case <-endUpdateKeeperInfo:
updateKeeperInfoTimerCh = time.NewTimer(p.sleepInterval).C
updateKeeperInfoTimer.Reset(p.sleepInterval)
}
}
}
Expand Down