From 3420ea80e949f73cb8ea834992ec68ea7772b9c5 Mon Sep 17 00:00:00 2001
From: Peter Bollen
Date: Thu, 2 Jul 2026 17:05:52 +0200
Subject: [PATCH] Fix instant-DDL deadlock on GhostTableMigrated changelog
signal
When --attempt-instant-ddl succeeds, Migrate() returns early after
finalCleanup() without ever receiving from the ghostTableMigrated channel.
initiateApplier writes a GhostTableMigrated changelog row, and the streamer's
listener callback (onChangelogStateEvent) publishes that signal synchronously
via SendWithContext while holding EventsStreamer.listenersMutex. On the normal
path a receiver drains it, but the instant-DDL path skips that receive, so the
send blocks forever holding the mutex. finalCleanup() then closes the binlog
reader, whose rows-event decode callback (shouldDecodeRowsEvent) needs the same
mutex, and Close() waits for that goroutine to exit -> permanent deadlock.
Fix: drain the GhostTableMigrated signal on the instant-DDL success path before
finalCleanup, mirroring the existing receive on the normal path. Resume
migrations never emit the signal, so draining is guarded by !Resume.
Adds regression tests: a migrator-level test for drainGhostTableMigrated (drains
the signal and unblocks the publisher; skips for resume migrations) and a
streamer-level test that reproduces the exact deadlock and proves draining
resolves it.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
go/logic/migrator.go | 23 +++++++++++++
go/logic/migrator_test.go | 53 ++++++++++++++++++++++++++++++
go/logic/streamer_test.go | 68 +++++++++++++++++++++++++++++++++++++++
3 files changed, 144 insertions(+)
diff --git a/go/logic/migrator.go b/go/logic/migrator.go
index 017a0c302..3b0feec0c 100644
--- a/go/logic/migrator.go
+++ b/go/logic/migrator.go
@@ -471,6 +471,28 @@ func (mgtr *Migrator) checkAbort() error {
return nil
}
+// drainGhostTableMigrated consumes the "GhostTableMigrated" changelog signal that
+// initiateApplier emits once the ghost table is in place. It must be called on the
+// instant-DDL success path before finalCleanup runs.
+//
+// The streamer's changelog listener callback (onChangelogStateEvent) publishes this
+// signal synchronously while holding the streamer's listenersMutex. On the normal
+// migration path there is a dedicated receiver (see below in Migrate), but the
+// instant-DDL path returns early and would otherwise never receive. The blocked send
+// keeps listenersMutex held forever, so when finalCleanup closes the binlog reader
+// its rows-event decode callback (shouldDecodeRowsEvent) deadlocks trying to acquire
+// the same mutex. Draining the signal here releases the listener and avoids the
+// deadlock.
+//
+// Resume migrations never emit the signal (initiateApplier only writes it when
+// !Revert && !Resume), so there is nothing to drain in that case.
+func (mgtr *Migrator) drainGhostTableMigrated() {
+ if mgtr.migrationContext.Resume {
+ return
+ }
+ <-mgtr.ghostTableMigrated
+}
+
// Migrate executes the complete migration logic. This is *the* major gh-ost function.
func (mgtr *Migrator) Migrate() (err error) {
mgtr.migrationContext.Log.Infof("Migrating %s.%s", sql.EscapeName(mgtr.migrationContext.DatabaseName), sql.EscapeName(mgtr.migrationContext.OriginalTableName))
@@ -535,6 +557,7 @@ func (mgtr *Migrator) Migrate() (err error) {
} else {
mgtr.migrationContext.Log.Infof("Attempting to execute alter with ALGORITHM=INSTANT")
if err := mgtr.applier.AttemptInstantDDL(); err == nil {
+ mgtr.drainGhostTableMigrated()
if err := mgtr.finalCleanup(); err != nil {
return nil
}
diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go
index 2b76a2360..ab6095218 100644
--- a/go/logic/migrator_test.go
+++ b/go/logic/migrator_test.go
@@ -35,6 +35,59 @@ import (
"github.com/testcontainers/testcontainers-go"
)
+func TestMigratorDrainGhostTableMigrated(t *testing.T) {
+ t.Run("receives the GhostTableMigrated signal", func(t *testing.T) {
+ migrationContext := base.NewMigrationContext()
+ migrator := NewMigrator(migrationContext, "test")
+
+ // Emulate the streamer's changelog listener: onChangelogStateEvent publishes
+ // the signal synchronously and only returns once a receiver consumes it.
+ sendErr := make(chan error, 1)
+ go func() {
+ sendErr <- base.SendWithContext(migrationContext.GetContext(), migrator.ghostTableMigrated, true)
+ }()
+
+ done := make(chan struct{})
+ go func() {
+ migrator.drainGhostTableMigrated()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(2 * time.Second):
+ t.Fatal("drainGhostTableMigrated blocked; the signal was not consumed")
+ }
+
+ select {
+ case err := <-sendErr:
+ require.NoError(t, err, "publisher should have been unblocked by the drain")
+ case <-time.After(2 * time.Second):
+ t.Fatal("publisher still blocked after drain")
+ }
+ })
+
+ t.Run("skips draining for resume migrations", func(t *testing.T) {
+ migrationContext := base.NewMigrationContext()
+ migrationContext.Resume = true
+ migrator := NewMigrator(migrationContext, "test")
+
+ // A resume migration never emits the signal, so the drain must return
+ // immediately instead of blocking forever on the empty channel.
+ done := make(chan struct{})
+ go func() {
+ migrator.drainGhostTableMigrated()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(2 * time.Second):
+ t.Fatal("drainGhostTableMigrated blocked for a resume migration")
+ }
+ })
+}
+
func TestMigratorOnChangelogEvent(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrator := NewMigrator(migrationContext, "1.2.3")
diff --git a/go/logic/streamer_test.go b/go/logic/streamer_test.go
index b887eb098..60c934660 100644
--- a/go/logic/streamer_test.go
+++ b/go/logic/streamer_test.go
@@ -7,7 +7,9 @@ import (
"testing"
"time"
+ "github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
+ "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/mysql"
@@ -287,6 +289,72 @@ func TestEventsStreamerShouldDecodeRowsEvent(t *testing.T) {
}
}
+func TestEventsStreamerInstantDDLDeadlockIsResolvedByDraining(t *testing.T) {
+ // Regression test for the instant-DDL deadlock. It reproduces the exact
+ // mechanism and proves that draining the GhostTableMigrated signal (what
+ // Migrator.drainGhostTableMigrated does on the instant-DDL success path)
+ // resolves it.
+ //
+ // notifyListeners invokes the changelog listener synchronously while holding
+ // listenersMutex. The listener publishes on an unbuffered channel (mirroring
+ // onChangelogStateEvent -> SendWithContext) and blocks until the signal is
+ // received. Meanwhile the binlog reader's shouldDecodeRowsEvent needs the same
+ // mutex and blocks as well. Without a receiver both stay blocked forever.
+ migrationContext := newTestMigrationContext()
+ streamer := NewEventsStreamer(migrationContext)
+
+ ghostTableMigrated := make(chan bool) // unbuffered, mirrors Migrator.ghostTableMigrated
+
+ err := streamer.AddListener(false, testMysqlDatabase, testMysqlTableName, func(event *binlog.BinlogEntry) error {
+ return base.SendWithContext(migrationContext.GetContext(), ghostTableMigrated, true)
+ })
+ require.NoError(t, err)
+
+ entry := &binlog.BinlogEntry{
+ DmlEvent: binlog.NewBinlogDMLEvent(testMysqlDatabase, testMysqlTableName, binlog.InsertDML),
+ }
+
+ notifyReturned := make(chan struct{})
+ go func() {
+ streamer.notifyListeners(entry) // holds listenersMutex, blocks on the listener's send
+ close(notifyReturned)
+ }()
+
+ decodeReturned := make(chan bool, 1)
+ go func() {
+ decodeReturned <- streamer.shouldDecodeRowsEvent(testMysqlDatabase, testMysqlTableName)
+ }()
+
+ // Both goroutines are blocked and cannot progress until the signal is drained:
+ // notifyListeners on the send, shouldDecodeRowsEvent on the mutex.
+ select {
+ case <-notifyReturned:
+ t.Fatal("notifyListeners returned before draining; the test no longer reproduces the deadlock")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // The fix: the instant-DDL path drains the signal before finalCleanup.
+ select {
+ case <-ghostTableMigrated:
+ case <-time.After(2 * time.Second):
+ t.Fatal("GhostTableMigrated signal was never published")
+ }
+
+ // Draining releases the listener, so notifyListeners returns and frees the mutex,
+ // which unblocks the decode path.
+ select {
+ case <-notifyReturned:
+ case <-time.After(2 * time.Second):
+ t.Fatal("notifyListeners still blocked after drain: deadlock not resolved")
+ }
+ select {
+ case decoded := <-decodeReturned:
+ require.True(t, decoded, "registered table should be decoded")
+ case <-time.After(2 * time.Second):
+ t.Fatal("shouldDecodeRowsEvent still blocked after drain: mutex was not released")
+ }
+}
+
func TestEventsStreamer(t *testing.T) {
if testing.Short() {
t.Skip("skipping events streamer test suite in short mode")