diff --git a/catalog/database.go b/catalog/database.go index 8e985885..a35ee1f0 100644 --- a/catalog/database.go +++ b/catalog/database.go @@ -7,6 +7,8 @@ import ( "sync" "github.com/apecloud/myduckserver/adapter" + "github.com/apecloud/myduckserver/configuration" + "github.com/apecloud/myduckserver/mycontext" "github.com/dolthub/go-mysql-server/sql" ) @@ -72,7 +74,7 @@ func (d *Database) tablesInsensitive(ctx *sql.Context, pattern string) ([]*Table return nil, err } for _, t := range tables { - t.WithSchema(ctx) + t.withSchema(ctx) } return tables, nil } @@ -91,7 +93,7 @@ func (d *Database) findTables(ctx *sql.Context, pattern string) ([]*Table, error if err := rows.Scan(&tblName, &comment); err != nil { return nil, ErrDuckDB.New(err) } - t := NewTable(tblName, d).WithComment(DecodeComment[any](comment.String)) + t := NewTable(tblName, d).withComment(DecodeComment[ExtraTableInfo](comment.String)) tbls = append(tbls, t) } if err := rows.Err(); err != nil { @@ -106,13 +108,13 @@ func (d *Database) Name() string { return d.name } -func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string, is_temp bool) error { +func (d *Database) createAllTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string, temporary bool) error { var columns []string var columnCommentSQLs []string var fullTableName string - if is_temp { + if temporary { fullTableName = FullTableName("temp", "main", name) } else { fullTableName = FullTableName(d.catalog, d.name, name) @@ -142,7 +144,7 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim var fullColumnName string - if is_temp { + if temporary { fullColumnName = FullColumnName("temp", "main", name, col.Name) } else { fullColumnName = FullColumnName(d.catalog, d.name, name, col.Name) @@ -151,16 +153,16 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim if col.Comment != "" || typ.mysql.Name != "" || col.Default != nil { columnCommentSQLs = append(columnCommentSQLs, fmt.Sprintf(`COMMENT ON COLUMN %s IS '%s'`, fullColumnName, - NewCommentWithMeta[MySQLType](col.Comment, typ.mysql).Encode())) + NewCommentWithMeta(col.Comment, typ.mysql).Encode())) } } - var sqlsBuild strings.Builder + var b strings.Builder - if is_temp { - sqlsBuild.WriteString(fmt.Sprintf(`CREATE TEMP TABLE %s (%s`, name, strings.Join(columns, ", "))) + if temporary { + b.WriteString(fmt.Sprintf(`CREATE TEMP TABLE %s (%s`, name, strings.Join(columns, ", "))) } else { - sqlsBuild.WriteString(fmt.Sprintf(`CREATE TABLE %s (%s`, fullTableName, strings.Join(columns, ", "))) + b.WriteString(fmt.Sprintf(`CREATE TABLE %s (%s`, fullTableName, strings.Join(columns, ", "))) } var primaryKeys []string @@ -168,24 +170,29 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim primaryKeys = append(primaryKeys, schema.Schema[pkord].Name) } - if len(primaryKeys) > 0 { - sqlsBuild.WriteString(fmt.Sprintf(", PRIMARY KEY (%s)", strings.Join(primaryKeys, ", "))) + // https://github.com/apecloud/myduckserver/issues/272 + if !(mycontext.IsReplicationQuery(ctx) && configuration.IsReplicationWithoutIndex()) { + if len(primaryKeys) > 0 { + b.WriteString(fmt.Sprintf(", PRIMARY KEY (%s)", strings.Join(primaryKeys, ", "))) + } } - sqlsBuild.WriteString(")") + b.WriteString(")") // Add comment to the table - if comment != "" { - sqlsBuild.WriteString(fmt.Sprintf("; COMMENT ON TABLE %s IS '%s'", fullTableName, NewComment[any](comment).Encode())) - } + b.WriteString(fmt.Sprintf( + "; COMMENT ON TABLE %s IS '%s'", + fullTableName, + NewCommentWithMeta(comment, ExtraTableInfo{schema.PkOrdinals}).Encode(), + )) // Add column comments for _, s := range columnCommentSQLs { - sqlsBuild.WriteString(";") - sqlsBuild.WriteString(s) + b.WriteString(";") + b.WriteString(s) } - _, err := adapter.Exec(ctx, sqlsBuild.String()) + _, err := adapter.Exec(ctx, b.String()) if err != nil { if IsDuckDBTableAlreadyExistsError(err) { return sql.ErrTableAlreadyExists.New(name) @@ -202,14 +209,14 @@ func (d *Database) CreateAllTable(ctx *sql.Context, name string, schema sql.Prim func (d *Database) CreateTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID, comment string) error { d.mu.Lock() defer d.mu.Unlock() - return d.CreateAllTable(ctx, name, schema, collation, comment, false) + return d.createAllTable(ctx, name, schema, collation, comment, false) } // CreateTemporaryTable implements sql.CreateTemporaryTable. func (d *Database) CreateTemporaryTable(ctx *sql.Context, name string, schema sql.PrimaryKeySchema, collation sql.CollationID) error { d.mu.Lock() defer d.mu.Unlock() - return d.CreateAllTable(ctx, name, schema, collation, "", true) + return d.createAllTable(ctx, name, schema, collation, "", true) } // DropTable implements sql.TableDropper. diff --git a/catalog/table.go b/catalog/table.go index aa4b0fad..ecc880f9 100644 --- a/catalog/table.go +++ b/catalog/table.go @@ -7,6 +7,8 @@ import ( "sync" "github.com/apecloud/myduckserver/adapter" + "github.com/apecloud/myduckserver/configuration" + "github.com/apecloud/myduckserver/mycontext" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/expression" "github.com/marcboeker/go-duckdb" @@ -17,10 +19,14 @@ type Table struct { mu *sync.RWMutex name string db *Database - comment *Comment[any] // save the comment to avoid querying duckdb everytime + comment *Comment[ExtraTableInfo] // save the comment to avoid querying duckdb everytime schema sql.PrimaryKeySchema } +type ExtraTableInfo struct { + PkOrdinals []int +} + type ColumnInfo struct { ColumnName string ColumnIndex int @@ -53,19 +59,30 @@ func NewTable(name string, db *Database) *Table { } } -func (t *Table) WithComment(comment *Comment[any]) *Table { +func (t *Table) withComment(comment *Comment[ExtraTableInfo]) *Table { t.comment = comment return t } -func (t *Table) WithSchema(ctx *sql.Context) *Table { - t.mu.Lock() - defer t.mu.Unlock() - +func (t *Table) withSchema(ctx *sql.Context) *Table { t.schema = getPKSchema(ctx, t.db.catalog, t.db.name, t.name) + + // https://github.com/apecloud/myduckserver/issues/272 + if len(t.schema.PkOrdinals) == 0 && configuration.IsReplicationWithoutIndex() { + // Pretend that the primary key exists + for _, idx := range t.comment.Meta.PkOrdinals { + t.schema.Schema[idx].PrimaryKey = true + } + t.schema = sql.NewPrimaryKeySchema(t.schema.Schema, t.comment.Meta.PkOrdinals...) + } + return t } +func (t *Table) ExtraTableInfo() ExtraTableInfo { + return t.comment.Meta +} + // Collation implements sql.Table. func (t *Table) Collation() sql.CollationID { return sql.Collation_Default @@ -333,6 +350,11 @@ func (t *Table) CreateIndex(ctx *sql.Context, indexDef sql.IndexDef) error { t.mu.Lock() defer t.mu.Unlock() + // https://github.com/apecloud/myduckserver/issues/272 + if mycontext.IsReplicationQuery(ctx) && configuration.IsReplicationWithoutIndex() { + return nil + } + if indexDef.IsPrimary() { return fmt.Errorf("primary key cannot be created with CreateIndex, use ALTER TABLE ... ADD PRIMARY KEY instead") } diff --git a/configuration/env.go b/configuration/env.go new file mode 100644 index 00000000..ad334ef9 --- /dev/null +++ b/configuration/env.go @@ -0,0 +1,18 @@ +package configuration + +import ( + "os" + "strings" +) + +const ( + replicationWithoutIndex = "REPLICATION_WITHOUT_INDEX" +) + +func IsReplicationWithoutIndex() bool { + switch strings.ToLower(os.Getenv(replicationWithoutIndex)) { + case "", "t", "1", "true": + return true + } + return false +} diff --git a/delta/controller.go b/delta/controller.go index 93fba7f2..9c789dd3 100644 --- a/delta/controller.go +++ b/delta/controller.go @@ -13,6 +13,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/array" "github.com/apecloud/myduckserver/binlog" "github.com/apecloud/myduckserver/catalog" + "github.com/apecloud/myduckserver/configuration" "github.com/apecloud/myduckserver/pgtypes" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/types" @@ -167,6 +168,8 @@ func (c *DeltaController) updateTable( log.Debugf("Delta: %s.%s: stats: %+v", table.dbName, table.tableName, appender.counters) } + withoutIndex := configuration.IsReplicationWithoutIndex() + switch { case hasInserts && !hasDeletes && !hasUpdates: // Case 1: INSERT only @@ -174,9 +177,12 @@ func (c *DeltaController) updateTable( case hasDeletes && !hasInserts && !hasUpdates: // Case 2: DELETE only return c.handleDeleteOnly(ctx, conn, tx, table, appender, stats) - case appender.counters.action.delete == 0: + case appender.counters.action.delete == 0 && !withoutIndex: // Case 3: INSERT + non-primary-key UPDATE return c.handleZeroDelete(ctx, conn, tx, table, appender, stats) + case withoutIndex: + // Case 4: Without index + return c.handleWithoutIndex(ctx, conn, tx, table, appender, stats) default: // Case 4: General case return c.handleGeneralCase(ctx, conn, tx, table, appender, stats) @@ -284,7 +290,6 @@ func (c *DeltaController) handleInsertOnly( b.WriteString(viewName) sql := b.String() - ctx.GetLogger().Debug("Insert SQL: ", b.String()) result, err := tx.ExecContext(ctx, sql) if err != nil { @@ -395,7 +400,8 @@ func (c *DeltaController) handleZeroDelete( return nil } -func (c *DeltaController) handleGeneralCase( +// Materialize the condensed delta view as a temporary table. +func (c *DeltaController) materializeCondensedDelta( ctx *sql.Context, conn *stdsql.Conn, tx *stdsql.Tx, @@ -407,11 +413,12 @@ func (c *DeltaController) handleGeneralCase( if err != nil { return err } + defer release() - // Create a temporary table to store the latest delta view condenseDeltaSQL := buildCondenseDeltaSQL(viewName, appender) + + // Create a temporary table to store the latest delta view result, err := tx.ExecContext(ctx, "CREATE OR REPLACE TEMP TABLE delta AS "+condenseDeltaSQL) - release() // release the Arrow view immediately if err != nil { return err } @@ -419,8 +426,6 @@ func (c *DeltaController) handleGeneralCase( if err != nil { return err } - stats.DeltaSize += affected - defer tx.ExecContext(ctx, "DROP TABLE IF EXISTS temp.main.delta") if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { log.WithFields(logrus.Fields{ @@ -430,14 +435,32 @@ func (c *DeltaController) handleGeneralCase( }).Debug("Delta created") } + stats.DeltaSize += affected + return nil +} + +func (c *DeltaController) handleGeneralCase( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + if err := c.materializeCondensedDelta(ctx, conn, tx, table, appender, stats); err != nil { + return err + } + defer tx.ExecContext(ctx, "DROP TABLE IF EXISTS temp.main.delta") + qualifiedTableName := catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + affected := int64(0) // Insert or replace new rows (action = INSERT) into the base table. insertSQL := "INSERT OR REPLACE INTO " + qualifiedTableName + " SELECT * EXCLUDE (" + AugmentedColumnList + ") FROM temp.main.delta WHERE action = " + strconv.Itoa(int(binlog.InsertRowEvent)) - result, err = tx.ExecContext(ctx, insertSQL) + result, err := tx.ExecContext(ctx, insertSQL) if err == nil { affected, err = result.RowsAffected() } @@ -502,6 +525,80 @@ func (c *DeltaController) handleGeneralCase( return nil } +// If the physical DuckDB table does not have an index, we have to handle the delta without UPSERT. +// This is the case when the environment variable `REPLICATION_WITHOUT_INDEX` is set to `true`. +// Currently, since DuckDB's indexes come with limitations and performance issues, +// we enable this feature by default. See: +// +// https://github.com/apecloud/myduckserver/issues/272 +// +// The benefit of this mode is that we can use explict DELETE+INSERT to handle the delta. +func (c *DeltaController) handleWithoutIndex( + ctx *sql.Context, + conn *stdsql.Conn, + tx *stdsql.Tx, + table tableIdentifier, + appender *DeltaAppender, + stats *FlushStats, +) error { + if err := c.materializeCondensedDelta(ctx, conn, tx, table, appender, stats); err != nil { + return err + } + defer tx.ExecContext(ctx, "DROP TABLE IF EXISTS temp.main.delta") + + qualifiedTableName := catalog.ConnectIdentifiersANSI(table.dbName, table.tableName) + affected := int64(0) + + // Delete all rows that have been modified. + // The plan for `IN` is optimized to a SEMI JOIN, + // which is more efficient than ordinary INNER JOIN. + // DuckDB does not support multiple columns in `IN` clauses, + // so we need to handle this case separately using the `row()` function. + inTuple := getPrimaryKeyStruct(appender.BaseSchema()) + deleteSQL := "DELETE FROM " + qualifiedTableName + + " WHERE " + inTuple + " IN (SELECT " + inTuple + "FROM temp.main.delta)" + result, err := tx.ExecContext(ctx, deleteSQL) + if err == nil { + affected, err = result.RowsAffected() + } + if err != nil { + return err + } + stats.Deletions += affected + + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "rows": affected, + }).Debug("Deleted") + } + + // Insert new rows (action = INSERT) into the base table. + insertSQL := "INSERT INTO " + + qualifiedTableName + + " SELECT * EXCLUDE (" + AugmentedColumnList + ") " + + "FROM temp.main.delta WHERE action = " + strconv.Itoa(int(binlog.InsertRowEvent)) + result, err = tx.ExecContext(ctx, insertSQL) + if err == nil { + affected, err = result.RowsAffected() + } + if err != nil { + return err + } + stats.Insertions += affected + + if log := ctx.GetLogger(); log.Logger.IsLevelEnabled(logrus.DebugLevel) { + log.WithFields(logrus.Fields{ + "db": table.dbName, + "table": table.tableName, + "rows": affected, + }).Debug("Inserted") + } + + return nil +} + // Helper function to build column list with timestamp handling func buildColumnList(b *strings.Builder, schema sql.Schema) { for i, col := range schema { diff --git a/mycontext/keys.go b/mycontext/keys.go new file mode 100644 index 00000000..21457e1d --- /dev/null +++ b/mycontext/keys.go @@ -0,0 +1,36 @@ +package mycontext + +import "context" + +type QueryOriginKey struct{} + +type QueryOriginKind uint8 + +const ( + FrontendQueryOrigin QueryOriginKind = iota + InternalQueryOrigin + MySQLReplicationQueryOrigin + PostgresReplicationQueryOrigin +) + +var queryOriginKey = QueryOriginKey{} + +func WithQueryOrigin(ctx context.Context, kind QueryOriginKind) context.Context { + return context.WithValue(ctx, queryOriginKey, kind) +} + +func QueryOrigin(ctx context.Context) QueryOriginKind { + if kind, ok := ctx.Value(queryOriginKey).(QueryOriginKind); ok { + return kind + } + return FrontendQueryOrigin +} + +func IsReplicationQuery(ctx context.Context) bool { + switch QueryOrigin(ctx) { + case MySQLReplicationQueryOrigin, PostgresReplicationQueryOrigin: + return true + default: + return false + } +} diff --git a/pgserver/logrepl/ddl.go b/pgserver/logrepl/ddl.go index 60c17200..6822c701 100644 --- a/pgserver/logrepl/ddl.go +++ b/pgserver/logrepl/ddl.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/apecloud/myduckserver/catalog" + "github.com/apecloud/myduckserver/configuration" "github.com/apecloud/myduckserver/pgtypes" ) @@ -21,18 +22,21 @@ func generateCreateTableStmt(msg *pglogrepl.RelationMessageV2) (string, error) { if i > 0 { sb.WriteString(", ") } - sb.WriteString(col.Name) + sb.WriteString(catalog.QuoteIdentifierANSI(col.Name)) sb.WriteString(" ") sb.WriteString(pgTypeName(col)) if col.Flags == 1 { keyColumns = append(keyColumns, col.Name) } } - if len(keyColumns) > 0 { + + // https://github.com/apecloud/myduckserver/issues/272 + if !configuration.IsReplicationWithoutIndex() && len(keyColumns) > 0 { sb.WriteString(", PRIMARY KEY (") sb.WriteString(strings.Join(keyColumns, ", ")) sb.WriteString(")") } + sb.WriteString(");") return sb.String(), nil } diff --git a/pgserver/logrepl/replication_test.go b/pgserver/logrepl/replication_test.go index fa2ae542..7548de8c 100644 --- a/pgserver/logrepl/replication_test.go +++ b/pgserver/logrepl/replication_test.go @@ -17,8 +17,6 @@ package logrepl_test import ( "context" "fmt" - "github.com/apecloud/myduckserver/adapter" - "github.com/jackc/pglogrepl" "log" "os" "os/exec" @@ -26,6 +24,10 @@ import ( "testing" "time" + "github.com/apecloud/myduckserver/adapter" + "github.com/apecloud/myduckserver/configuration" + "github.com/jackc/pglogrepl" + "github.com/apecloud/myduckserver/pgserver" "github.com/apecloud/myduckserver/pgserver/logrepl" "github.com/apecloud/myduckserver/pgtest" @@ -741,6 +743,13 @@ func runReplicationScript( } conn := connectionForQuery(t, query, connections, primaryDns) + + target, _ := clientSpecFromQueryComment(query) + if target == "replica" && configuration.IsReplicationWithoutIndex() { + // Remove the primary key index from the replica table + query = strings.Replace(query, " primary key,", ",", 1) + } + log.Println("Running setup query:", query) _, err := conn.Exec(ctx, query) require.NoError(t, err) diff --git a/replica/replication.go b/replica/replication.go index 64f07f57..5a039b84 100644 --- a/replica/replication.go +++ b/replica/replication.go @@ -28,6 +28,7 @@ import ( "github.com/apecloud/myduckserver/binlogreplication" "github.com/apecloud/myduckserver/catalog" "github.com/apecloud/myduckserver/delta" + "github.com/apecloud/myduckserver/mycontext" ) // registerReplicaController registers the replica controller into the engine @@ -36,8 +37,11 @@ func RegisterReplicaController(provider *catalog.DatabaseProvider, engine *sqle. replica := binlogreplication.MyBinlogReplicaController replica.SetEngine(engine) + stdctx := context.Background() + stdctx = mycontext.WithQueryOrigin(stdctx, mycontext.MySQLReplicationQueryOrigin) + session := backend.NewSession(memory.NewSession(sql.NewBaseSession(), provider), provider, pool) - ctx := sql.NewContext(context.Background(), sql.WithSession(session)) + ctx := sql.NewContext(stdctx, sql.WithSession(session)) ctx.SetCurrentDatabase("mysql") replica.SetExecutionContext(ctx)