Skip to content

Commit

Permalink
Merge branch 'main' into 236-backup
Browse files Browse the repository at this point in the history
  • Loading branch information
TianyuZhang1214 authored Dec 10, 2024
2 parents 53f04ed + 73f3a8e commit f68f21e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
40 changes: 36 additions & 4 deletions pgserver/logrepl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/apecloud/myduckserver/adapter"
"github.com/apecloud/myduckserver/binlog"
"github.com/apecloud/myduckserver/catalog"
"github.com/apecloud/myduckserver/delta"
"github.com/apecloud/myduckserver/pgtypes"
"github.com/dolthub/go-mysql-server/sql"
Expand Down Expand Up @@ -814,8 +815,8 @@ func (r *LogicalReplicator) processMessage(
// No old tuple provided; it means the key columns are unchanged.
// It's fine not to append a delete event to the delta in this case.
// However, the delta appender implements an optimization that
// uses INSERT instead of UPSERT+DELETE when there is no deletion in a batch.
// We need to enforce the use of UPSERT here because the deletion count is zero.
// uses INSERT instead of UPSERT+DELETE or DELETE+INSERT when there is no deletion in a batch.
// We need to enforce the latter code path here because the deletion count is zero.
err = r.append(state, logicalMsg.RelationID, nil, binlog.DeleteRowEvent, binlog.UpdateRowEvent, true)
}
if err != nil {
Expand All @@ -836,9 +837,9 @@ func (r *LogicalReplicator) processMessage(
if !state.processMessages {
r.logger.Debugf("Received stale message, ignoring. Last written LSN: %s Message LSN: %s", state.lastWrittenLSN, xld.ServerWALEnd)
return false, nil
// Determine which columns to use based on OldTupleType
}

// Determine which columns to use based on OldTupleType
switch logicalMsg.OldTupleType {
case pglogrepl.UpdateMessageTupleTypeKey:
err = r.append(state, logicalMsg.RelationID, logicalMsg.OldTuple.Columns, binlog.DeleteRowEvent, binlog.DeleteRowEvent, true)
Expand All @@ -858,7 +859,27 @@ func (r *LogicalReplicator) processMessage(
state.inTxnStmtID += 1

case *pglogrepl.TruncateMessageV2:
r.logger.Debugf("truncate for xid %d\n", logicalMsg.Xid)
if !state.processMessages {
r.logger.Debugf("Received stale message, ignoring. Last written LSN: %s Message LSN: %s", state.lastWrittenLSN, xld.ServerWALEnd)
return false, nil
}

r.logger.Debugf("Truncate message: xid %d\n", logicalMsg.Xid)

// Flush the delta buffer first
r.flushDeltaBuffer(state, nil, nil, delta.DMLStmtFlushReason)

// Truncate the tables
for _, relationID := range logicalMsg.RelationIDs {
if err := r.truncate(state, relationID); err != nil {
return false, err
}
}

state.dirtyTxn = true
state.dirtyStream = true
state.inTxnStmtID += 1

case *pglogrepl.TypeMessageV2:
r.logger.Debugf("typeMessage for xid %d\n", logicalMsg.Xid)
case *pglogrepl.OriginMessage:
Expand Down Expand Up @@ -1084,6 +1105,17 @@ func (r *LogicalReplicator) append(state *replicationState, relationID uint32, t
return nil
}

func (r *LogicalReplicator) truncate(state *replicationState, relationID uint32) error {
rel, ok := state.relations[relationID]
if !ok {
return fmt.Errorf("unknown relation ID %d", relationID)
}

r.logger.Debugf("Truncating table %s.%s\n", rel.Namespace, rel.RelationName)
_, err := adapter.ExecInTxn(state.replicaCtx, `TRUNCATE `+catalog.ConnectIdentifiersANSI(rel.Namespace, rel.RelationName))
return err
}

func tupleDataFormat(dataType uint8) int16 {
switch dataType {
case pglogrepl.TupleDataTypeBinary:
Expand Down
23 changes: 23 additions & 0 deletions pgserver/logrepl/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,29 @@ var replicationTests = []ReplicationTest{
},
},
},
{
Name: "Truncate table",
SetUpScript: []string{
dropReplicationSlot,
createReplicationSlot,
startReplication,
"/* replica */ drop table if exists public.test",
"drop table if exists public.test",
"CREATE TABLE public.test (id INT primary key, name varchar(10))",
"INSERT INTO public.test VALUES (1, 'one'), (2, 'two'), (3, 'three')",
"TRUNCATE TABLE public.test",
"INSERT INTO public.test VALUES (4, 'four')",
waitForCatchup,
},
Assertions: []ScriptTestAssertion{
{
Query: "/* replica */ SELECT * FROM public.test order by id",
Expected: []sql.Row{
{int32(4), "four"},
},
},
},
},
}

func TestReplication(t *testing.T) {
Expand Down

0 comments on commit f68f21e

Please sign in to comment.