diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 017a0c302..3b0feec0c 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -471,6 +471,28 @@ func (mgtr *Migrator) checkAbort() error { return nil } +// drainGhostTableMigrated consumes the "GhostTableMigrated" changelog signal that +// initiateApplier emits once the ghost table is in place. It must be called on the +// instant-DDL success path before finalCleanup runs. +// +// The streamer's changelog listener callback (onChangelogStateEvent) publishes this +// signal synchronously while holding the streamer's listenersMutex. On the normal +// migration path there is a dedicated receiver (see below in Migrate), but the +// instant-DDL path returns early and would otherwise never receive. The blocked send +// keeps listenersMutex held forever, so when finalCleanup closes the binlog reader +// its rows-event decode callback (shouldDecodeRowsEvent) deadlocks trying to acquire +// the same mutex. Draining the signal here releases the listener and avoids the +// deadlock. +// +// Resume migrations never emit the signal (initiateApplier only writes it when +// !Revert && !Resume), so there is nothing to drain in that case. +func (mgtr *Migrator) drainGhostTableMigrated() { + if mgtr.migrationContext.Resume { + return + } + <-mgtr.ghostTableMigrated +} + // Migrate executes the complete migration logic. This is *the* major gh-ost function. func (mgtr *Migrator) Migrate() (err error) { mgtr.migrationContext.Log.Infof("Migrating %s.%s", sql.EscapeName(mgtr.migrationContext.DatabaseName), sql.EscapeName(mgtr.migrationContext.OriginalTableName)) @@ -535,6 +557,7 @@ func (mgtr *Migrator) Migrate() (err error) { } else { mgtr.migrationContext.Log.Infof("Attempting to execute alter with ALGORITHM=INSTANT") if err := mgtr.applier.AttemptInstantDDL(); err == nil { + mgtr.drainGhostTableMigrated() if err := mgtr.finalCleanup(); err != nil { return nil } diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 2b76a2360..ab6095218 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -35,6 +35,59 @@ import ( "github.com/testcontainers/testcontainers-go" ) +func TestMigratorDrainGhostTableMigrated(t *testing.T) { + t.Run("receives the GhostTableMigrated signal", func(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrator := NewMigrator(migrationContext, "test") + + // Emulate the streamer's changelog listener: onChangelogStateEvent publishes + // the signal synchronously and only returns once a receiver consumes it. + sendErr := make(chan error, 1) + go func() { + sendErr <- base.SendWithContext(migrationContext.GetContext(), migrator.ghostTableMigrated, true) + }() + + done := make(chan struct{}) + go func() { + migrator.drainGhostTableMigrated() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("drainGhostTableMigrated blocked; the signal was not consumed") + } + + select { + case err := <-sendErr: + require.NoError(t, err, "publisher should have been unblocked by the drain") + case <-time.After(2 * time.Second): + t.Fatal("publisher still blocked after drain") + } + }) + + t.Run("skips draining for resume migrations", func(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.Resume = true + migrator := NewMigrator(migrationContext, "test") + + // A resume migration never emits the signal, so the drain must return + // immediately instead of blocking forever on the empty channel. + done := make(chan struct{}) + go func() { + migrator.drainGhostTableMigrated() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("drainGhostTableMigrated blocked for a resume migration") + } + }) +} + func TestMigratorOnChangelogEvent(t *testing.T) { migrationContext := base.NewMigrationContext() migrator := NewMigrator(migrationContext, "1.2.3") diff --git a/go/logic/streamer_test.go b/go/logic/streamer_test.go index b887eb098..60c934660 100644 --- a/go/logic/streamer_test.go +++ b/go/logic/streamer_test.go @@ -7,7 +7,9 @@ import ( "testing" "time" + "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/mysql" @@ -287,6 +289,72 @@ func TestEventsStreamerShouldDecodeRowsEvent(t *testing.T) { } } +func TestEventsStreamerInstantDDLDeadlockIsResolvedByDraining(t *testing.T) { + // Regression test for the instant-DDL deadlock. It reproduces the exact + // mechanism and proves that draining the GhostTableMigrated signal (what + // Migrator.drainGhostTableMigrated does on the instant-DDL success path) + // resolves it. + // + // notifyListeners invokes the changelog listener synchronously while holding + // listenersMutex. The listener publishes on an unbuffered channel (mirroring + // onChangelogStateEvent -> SendWithContext) and blocks until the signal is + // received. Meanwhile the binlog reader's shouldDecodeRowsEvent needs the same + // mutex and blocks as well. Without a receiver both stay blocked forever. + migrationContext := newTestMigrationContext() + streamer := NewEventsStreamer(migrationContext) + + ghostTableMigrated := make(chan bool) // unbuffered, mirrors Migrator.ghostTableMigrated + + err := streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error { + return base.SendWithContext(migrationContext.GetContext(), ghostTableMigrated, true) + }) + require.NoError(t, err) + + entry := &binlog.BinlogEntry{ + DmlEvent: binlog.NewBinlogDMLEvent(testMysqlDatabase, testMysqlTableName, binlog.InsertDML), + } + + notifyReturned := make(chan struct{}) + go func() { + streamer.notifyListeners(entry) // holds listenersMutex, blocks on the listener's send + close(notifyReturned) + }() + + decodeReturned := make(chan bool, 1) + go func() { + decodeReturned <- streamer.shouldDecodeRowsEvent(testMysqlDatabase, testMysqlTableName) + }() + + // Both goroutines are blocked and cannot progress until the signal is drained: + // notifyListeners on the send, shouldDecodeRowsEvent on the mutex. + select { + case <-notifyReturned: + t.Fatal("notifyListeners returned before draining; the test no longer reproduces the deadlock") + case <-time.After(200 * time.Millisecond): + } + + // The fix: the instant-DDL path drains the signal before finalCleanup. + select { + case <-ghostTableMigrated: + case <-time.After(2 * time.Second): + t.Fatal("GhostTableMigrated signal was never published") + } + + // Draining releases the listener, so notifyListeners returns and frees the mutex, + // which unblocks the decode path. + select { + case <-notifyReturned: + case <-time.After(2 * time.Second): + t.Fatal("notifyListeners still blocked after drain: deadlock not resolved") + } + select { + case decoded := <-decodeReturned: + require.True(t, decoded, "registered table should be decoded") + case <-time.After(2 * time.Second): + t.Fatal("shouldDecodeRowsEvent still blocked after drain: mutex was not released") + } +} + func TestEventsStreamer(t *testing.T) { if testing.Short() { t.Skip("skipping events streamer test suite in short mode")