Skip to content

Commit

Permalink
fix: enable the replication of MySQL's TRUNCATE statement (#275)
Browse files Browse the repository at this point in the history
* test: add TestTruncateTable
* Implement sql.TruncateableTable
  • Loading branch information
fanyang01 authored Dec 9, 2024
1 parent 2950610 commit 2be0e15
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 0 deletions.
1 change: 1 addition & 0 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (b *DuckBuilder) Build(ctx *sql.Context, root sql.Node, r sql.Row) (sql.Row
switch n.(type) {
case *plan.CreateDB, *plan.DropDB, *plan.DropTable, *plan.RenameTable,
*plan.CreateTable, *plan.AddColumn, *plan.RenameColumn, *plan.DropColumn, *plan.ModifyColumn,
*plan.Truncate,
*plan.CreateIndex, *plan.DropIndex, *plan.AlterIndex, *plan.ShowIndexes,
*plan.ShowTables, *plan.ShowCreateTable, *plan.ShowColumns,
*plan.ShowBinlogs, *plan.ShowBinlogStatus, *plan.ShowWarnings,
Expand Down
25 changes: 25 additions & 0 deletions binlogreplication/binlog_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,31 @@ func TestCharsetsAndCollations(t *testing.T) {
require.NoError(t, rows.Close())
}

// TestTruncateTable tests that TRUNCATE TABLE is correctly replicated.
func TestTruncateTable(t *testing.T) {
defer teardown(t)
startSqlServersWithSystemVars(t, duckReplicaSystemVars)
startReplicationAndCreateTestDb(t, mySqlPort)

// Create a table and insert some data
primaryDatabase.MustExec("create table t (pk int primary key);")
primaryDatabase.MustExec("insert into t values (1), (2), (3);")

// Verify the data on the replica
waitForReplicaToCatchUp(t)
requireReplicaResults(t, "select * from db01.t order by pk;", [][]any{{"1"}, {"2"}, {"3"}})

// TRUNCATE TABLE and verify the replica
primaryDatabase.MustExec("truncate table t;")
waitForReplicaToCatchUp(t)
requireReplicaResults(t, "select * from db01.t;", [][]any{})

// Insert some new data and verify the replica
primaryDatabase.MustExec("insert into t values (4), (5), (6);")
waitForReplicaToCatchUp(t)
requireReplicaResults(t, "select * from db01.t order by pk;", [][]any{{"4"}, {"5"}, {"6"}})
}

//
// Test Helper Functions
//
Expand Down
11 changes: 11 additions & 0 deletions catalog/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var _ sql.IndexAddressableTable = (*Table)(nil)
var _ sql.InsertableTable = (*Table)(nil)
var _ sql.UpdatableTable = (*Table)(nil)
var _ sql.DeletableTable = (*Table)(nil)
var _ sql.TruncateableTable = (*Table)(nil)
var _ sql.ReplaceableTable = (*Table)(nil)
var _ sql.CommentedTable = (*Table)(nil)

Expand Down Expand Up @@ -333,6 +334,16 @@ func (t *Table) Deleter(*sql.Context) sql.RowDeleter {
return nil
}

// Truncate implements sql.TruncateableTable.
func (t *Table) Truncate(ctx *sql.Context) (int, error) {
result, err := adapter.ExecCatalog(ctx, `TRUNCATE TABLE `+FullTableName(t.db.catalog, t.db.name, t.name))
if err != nil {
return 0, err
}
affected, err := result.RowsAffected()
return int(affected), err
}

// Replacer implements sql.ReplaceableTable.
func (t *Table) Replacer(*sql.Context) sql.RowReplacer {
hasKey := len(t.schema.PkOrdinals) > 0 || !sql.IsKeyless(t.schema.Schema)
Expand Down

0 comments on commit 2be0e15

Please sign in to comment.