Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,28 @@ func (mgtr *Migrator) checkAbort() error {
return nil
}

// drainGhostTableMigrated consumes the "GhostTableMigrated" changelog signal that
// initiateApplier emits once the ghost table is in place. It must be called on the
// instant-DDL success path before finalCleanup runs.
//
// The streamer's changelog listener callback (onChangelogStateEvent) publishes this
// signal synchronously while holding the streamer's listenersMutex. On the normal
// migration path there is a dedicated receiver (see below in Migrate), but the
// instant-DDL path returns early and would otherwise never receive. The blocked send
// keeps listenersMutex held forever, so when finalCleanup closes the binlog reader
// its rows-event decode callback (shouldDecodeRowsEvent) deadlocks trying to acquire
// the same mutex. Draining the signal here releases the listener and avoids the
// deadlock.
//
// Resume migrations never emit the signal (initiateApplier only writes it when
// !Revert && !Resume), so there is nothing to drain in that case.
func (mgtr *Migrator) drainGhostTableMigrated() {
if mgtr.migrationContext.Resume {
return
}
<-mgtr.ghostTableMigrated
}

// Migrate executes the complete migration logic. This is *the* major gh-ost function.
func (mgtr *Migrator) Migrate() (err error) {
mgtr.migrationContext.Log.Infof("Migrating %s.%s", sql.EscapeName(mgtr.migrationContext.DatabaseName), sql.EscapeName(mgtr.migrationContext.OriginalTableName))
Expand Down Expand Up @@ -535,6 +557,7 @@ func (mgtr *Migrator) Migrate() (err error) {
} else {
mgtr.migrationContext.Log.Infof("Attempting to execute alter with ALGORITHM=INSTANT")
if err := mgtr.applier.AttemptInstantDDL(); err == nil {
mgtr.drainGhostTableMigrated()
if err := mgtr.finalCleanup(); err != nil {
return nil
}
Expand Down
53 changes: 53 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,59 @@ import (
"github.com/testcontainers/testcontainers-go"
)

func TestMigratorDrainGhostTableMigrated(t *testing.T) {
t.Run("receives the GhostTableMigrated signal", func(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrator := NewMigrator(migrationContext, "test")

// Emulate the streamer's changelog listener: onChangelogStateEvent publishes
// the signal synchronously and only returns once a receiver consumes it.
sendErr := make(chan error, 1)
go func() {
sendErr <- base.SendWithContext(migrationContext.GetContext(), migrator.ghostTableMigrated, true)
}()

done := make(chan struct{})
go func() {
migrator.drainGhostTableMigrated()
close(done)
}()

select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("drainGhostTableMigrated blocked; the signal was not consumed")
}

select {
case err := <-sendErr:
require.NoError(t, err, "publisher should have been unblocked by the drain")
case <-time.After(2 * time.Second):
t.Fatal("publisher still blocked after drain")
}
})

t.Run("skips draining for resume migrations", func(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrationContext.Resume = true
migrator := NewMigrator(migrationContext, "test")

// A resume migration never emits the signal, so the drain must return
// immediately instead of blocking forever on the empty channel.
done := make(chan struct{})
go func() {
migrator.drainGhostTableMigrated()
close(done)
}()

select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("drainGhostTableMigrated blocked for a resume migration")
}
})
}

func TestMigratorOnChangelogEvent(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrator := NewMigrator(migrationContext, "1.2.3")
Expand Down
68 changes: 68 additions & 0 deletions go/logic/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"testing"
"time"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/mysql"
Expand Down Expand Up @@ -287,6 +289,72 @@ func TestEventsStreamerShouldDecodeRowsEvent(t *testing.T) {
}
}

func TestEventsStreamerInstantDDLDeadlockIsResolvedByDraining(t *testing.T) {
// Regression test for the instant-DDL deadlock. It reproduces the exact
// mechanism and proves that draining the GhostTableMigrated signal (what
// Migrator.drainGhostTableMigrated does on the instant-DDL success path)
// resolves it.
//
// notifyListeners invokes the changelog listener synchronously while holding
// listenersMutex. The listener publishes on an unbuffered channel (mirroring
// onChangelogStateEvent -> SendWithContext) and blocks until the signal is
// received. Meanwhile the binlog reader's shouldDecodeRowsEvent needs the same
// mutex and blocks as well. Without a receiver both stay blocked forever.
migrationContext := newTestMigrationContext()
streamer := NewEventsStreamer(migrationContext)

ghostTableMigrated := make(chan bool) // unbuffered, mirrors Migrator.ghostTableMigrated

err := streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error {
return base.SendWithContext(migrationContext.GetContext(), ghostTableMigrated, true)
})
require.NoError(t, err)

entry := &binlog.BinlogEntry{
DmlEvent: binlog.NewBinlogDMLEvent(testMysqlDatabase, testMysqlTableName, binlog.InsertDML),
}

notifyReturned := make(chan struct{})
go func() {
streamer.notifyListeners(entry) // holds listenersMutex, blocks on the listener's send
close(notifyReturned)
}()

decodeReturned := make(chan bool, 1)
go func() {
decodeReturned <- streamer.shouldDecodeRowsEvent(testMysqlDatabase, testMysqlTableName)
}()

// Both goroutines are blocked and cannot progress until the signal is drained:
// notifyListeners on the send, shouldDecodeRowsEvent on the mutex.
select {
case <-notifyReturned:
t.Fatal("notifyListeners returned before draining; the test no longer reproduces the deadlock")
case <-time.After(200 * time.Millisecond):
}

// The fix: the instant-DDL path drains the signal before finalCleanup.
select {
case <-ghostTableMigrated:
case <-time.After(2 * time.Second):
t.Fatal("GhostTableMigrated signal was never published")
}

// Draining releases the listener, so notifyListeners returns and frees the mutex,
// which unblocks the decode path.
select {
case <-notifyReturned:
case <-time.After(2 * time.Second):
t.Fatal("notifyListeners still blocked after drain: deadlock not resolved")
}
select {
case decoded := <-decodeReturned:
require.True(t, decoded, "registered table should be decoded")
case <-time.After(2 * time.Second):
t.Fatal("shouldDecodeRowsEvent still blocked after drain: mutex was not released")
}
}

func TestEventsStreamer(t *testing.T) {
if testing.Short() {
t.Skip("skipping events streamer test suite in short mode")
Expand Down
Loading