Skip to content

Commit

Permalink
[WIP] Support mysqlsh -- load-dump/copy-instance
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 committed Oct 15, 2024
1 parent 5598311 commit 0f0271a
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 4 deletions.
2 changes: 2 additions & 0 deletions backend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (sess Session) PersistGlobal(sysVarName string, value interface{}) error {
if _, _, ok := sql.SystemVariables.GetGlobal(sysVarName); !ok {
return sql.ErrUnknownSystemVariable.New(sysVarName)
}
sess.GetLogger().Tracef("Persisting global variable %s = %v", sysVarName, value)
_, err := sess.ExecContext(
context.Background(),
catalog.InternalTables.PersistentVariable.UpsertStmt(),
Expand Down Expand Up @@ -162,6 +163,7 @@ func (sess Session) GetPersistedValue(k string) (interface{}, error) {
catalog.InternalTables.PersistentVariable.SelectStmt(),
k,
).Scan(&value, &vtype)
sess.GetLogger().Tracef("Getting persisted global variable %s = %s [%s]", k, value, vtype)
switch {
case err == stdsql.ErrNoRows:
return nil, nil
Expand Down
26 changes: 22 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
package main

import (
"context"
"flag"
"fmt"
"path/filepath"

"github.com/apecloud/myduckserver/backend"
"github.com/apecloud/myduckserver/catalog"
"github.com/apecloud/myduckserver/myfunc"
"github.com/apecloud/myduckserver/replica"
"github.com/apecloud/myduckserver/transpiler"
sqle "github.com/dolthub/go-mysql-server"
"github.com/dolthub/go-mysql-server/server"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"

_ "github.com/marcboeker/go-duckdb"
Expand All @@ -42,15 +44,27 @@ var (
port = 3306
dataDirectory = "."
dbFileName = "mysql.db"
dbFilePath string
logLevel = int(logrus.InfoLevel)

replicaOptions replica.ReplicaOptions
)

func init() {
flag.StringVar(&address, "address", address, "The address to bind to.")
flag.IntVar(&port, "port", port, "The port to bind to.")
flag.StringVar(&dataDirectory, "datadir", dataDirectory, "The directory to store the database.")
flag.IntVar(&logLevel, "loglevel", logLevel, "The log level to use.")

// The following options need to be set for MySQL Shell's utilities to work properly.

// https://dev.mysql.com/doc/refman/8.4/en/replication-options-replica.html#sysvar_report_host
flag.StringVar(&replicaOptions.ReportHost, "report-host", replicaOptions.ReportHost, "The host name or IP address of the replica to be reported to the source during replica registration.")
// https://dev.mysql.com/doc/refman/8.4/en/replication-options-replica.html#sysvar_report_port
flag.IntVar(&replicaOptions.ReportPort, "report-port", replicaOptions.ReportPort, "The TCP/IP port number for connecting to the replica, to be reported to the source during replica registration.")
// https://dev.mysql.com/doc/refman/8.4/en/replication-options-replica.html#sysvar_report_user
flag.StringVar(&replicaOptions.ReportUser, "report-user", replicaOptions.ReportUser, "The account user name of the replica to be reported to the source during replica registration.")
// https://dev.mysql.com/doc/refman/8.4/en/replication-options-replica.html#sysvar_report_password
flag.StringVar(&replicaOptions.ReportPassword, "report-password", replicaOptions.ReportPassword, "The account password of the replica to be reported to the source during replica registration.")
}

func ensureSQLTranslate() {
Expand All @@ -63,9 +77,11 @@ func ensureSQLTranslate() {
func main() {
flag.Parse()

logrus.SetLevel(logrus.Level(logLevel))
if replicaOptions.ReportPort == 0 {
replicaOptions.ReportPort = port
}

dbFilePath = filepath.Join(dataDirectory, dbFileName)
logrus.SetLevel(logrus.Level(logLevel))

ensureSQLTranslate()

Expand All @@ -81,11 +97,13 @@ func main() {

builder := backend.NewDuckBuilder(engine.Analyzer.ExecBuilder, pool)
engine.Analyzer.ExecBuilder = builder
engine.Analyzer.Catalog.RegisterFunction(sql.NewContext(context.Background()), myfunc.ExtraBuiltIns...)

if err := setPersister(provider, engine); err != nil {
logrus.Fatalln("Failed to set the persister:", err)
}

replica.RegisterReplicaOptions(&replicaOptions)
replica.RegisterReplicaController(provider, engine, pool, builder)

config := server.Config{
Expand Down
50 changes: 50 additions & 0 deletions myfunc/ps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package myfunc

import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/expression/function"
"github.com/dolthub/go-mysql-server/sql/types"
)

type PSCurrentThreadID struct {
function.NoArgFunc
}

func (c PSCurrentThreadID) IsNonDeterministic() bool {
return true
}

var _ sql.FunctionExpression = PSCurrentThreadID{}
var _ sql.CollationCoercible = PSCurrentThreadID{}

func NewPSCurrentThreadID() sql.Expression {
return PSCurrentThreadID{
NoArgFunc: function.NoArgFunc{"ps_current_thread_id", types.Uint64},
}
}

// FunctionName implements sql.FunctionExpression
func (c PSCurrentThreadID) FunctionName() string {
return "ps_current_thread_id"
}

// Description implements sql.FunctionExpression
func (c PSCurrentThreadID) Description() string {
return "Returns a BIGINT UNSIGNED value representing the Performance Schema thread ID assigned to the current connection."
}

// CollationCoercibility implements the interface sql.CollationCoercible.
func (PSCurrentThreadID) CollationCoercibility(ctx *sql.Context) (collation sql.CollationID, coercibility byte) {
return sql.Collation_utf8mb3_general_ci, 3
}

// Eval implements sql.Expression
func (c PSCurrentThreadID) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) {
// Golang discourages the use of thread ID or goroutine ID as they are not stable, so we use connection ID instead.
return uint64(ctx.ID()), nil
}

// WithChildren implements sql.Expression
func (c PSCurrentThreadID) WithChildren(children ...sql.Expression) (sql.Expression, error) {
return function.NoArgFuncWithChildren(c, children)
}
7 changes: 7 additions & 0 deletions myfunc/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package myfunc

import "github.com/dolthub/go-mysql-server/sql"

var ExtraBuiltIns = []sql.Function{
sql.Function0{Name: "ps_current_thread_id", Fn: NewPSCurrentThreadID},
}
52 changes: 52 additions & 0 deletions replica/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package replica

import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
)

// ReplicaOptions holds the options for a replica server.
// https://dev.mysql.com/doc/refman/8.4/en/replication-options-replica.html
type ReplicaOptions struct {
ReportHost string
ReportPort int
ReportUser string
ReportPassword string
}

func RegisterReplicaOptions(options *ReplicaOptions) {
sql.SystemVariables.AddSystemVariables([]sql.SystemVariable{
&sql.MysqlSystemVariable{
Name: "report_host",
Scope: sql.GetMysqlScope(sql.SystemVariableScope_Global),
Dynamic: false,
SetVarHintApplies: false,
Type: types.NewSystemStringType("report_host"),
Default: options.ReportHost,
},
&sql.MysqlSystemVariable{
Name: "report_port",
Scope: sql.GetMysqlScope(sql.SystemVariableScope_Global),
Dynamic: false,
SetVarHintApplies: false,
Type: types.NewSystemIntType("report_port", 0, 65535, false),
Default: int64(options.ReportPort),
},
&sql.MysqlSystemVariable{
Name: "report_user",
Scope: sql.GetMysqlScope(sql.SystemVariableScope_Global),
Dynamic: false,
SetVarHintApplies: false,
Type: types.NewSystemStringType("report_user"),
Default: options.ReportUser,
},
&sql.MysqlSystemVariable{
Name: "report_password",
Scope: sql.GetMysqlScope(sql.SystemVariableScope_Global),
Dynamic: false,
SetVarHintApplies: false,
Type: types.NewSystemStringType("report_password"),
Default: options.ReportPassword,
},
})
}

0 comments on commit 0f0271a

Please sign in to comment.