diff --git a/Makefile b/Makefile index 8e23a43c7..5f8af0816 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ SOURCE ?= file go_bindata github github_ee bitbucket aws_s3 google_cloud_storage godoc_vfs gitlab -DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite +DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite gbase8s DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-) TEST_FLAGS ?= diff --git a/database/gbase8s/README.md b/database/gbase8s/README.md new file mode 100644 index 000000000..1d0e27835 --- /dev/null +++ b/database/gbase8s/README.md @@ -0,0 +1,14 @@ +# GBase8s + +`gbase8s://user:password@ip:port/dbname?query` + + +| URL Query | WithInstance Config | Description | +| --------------------- | ------------------- | ------------------------------------------------------------ | +| `dbname` | `DatabaseName` | The name of the database to connect to | +| `user` | | The user to sign in as | +| `password` | | The user's password | +| `ip` | | The ip to connect to. | +| `port` | | The port to bind to. | + + diff --git a/database/gbase8s/examples/migrations/1_init.down.sql b/database/gbase8s/examples/migrations/1_init.down.sql new file mode 100644 index 000000000..1b10e6fc0 --- /dev/null +++ b/database/gbase8s/examples/migrations/1_init.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS test; \ No newline at end of file diff --git a/database/gbase8s/examples/migrations/1_init.up.sql b/database/gbase8s/examples/migrations/1_init.up.sql new file mode 100644 index 000000000..12607e875 --- /dev/null +++ b/database/gbase8s/examples/migrations/1_init.up.sql @@ -0,0 +1,3 @@ +CREATE TABLE IF NOT EXISTS test ( + username VARCHAR(20) +); \ No newline at end of file diff --git a/database/gbase8s/gbase8s.go b/database/gbase8s/gbase8s.go new file mode 100644 index 000000000..888e975b5 --- /dev/null +++ b/database/gbase8s/gbase8s.go @@ -0,0 +1,358 @@ +package gbase8s + +import ( + "context" + "database/sql" + "fmt" + "io" + "net/url" + "strings" + "time" + + _ "gitee.com/GBase8s/go-gci" + "github.com/golang-migrate/migrate/v4/database" + "github.com/hashicorp/go-multierror" + "go.uber.org/atomic" +) + +func init() { + database.Register("gbase8s", &Gbase8s{}) +} + +var ( + _ database.Driver = (*Gbase8s)(nil) + DefaultMigrationsTable = "schema_migrations" + DefaultLockTable = "schema_lock" +) + +var ( + ErrNoDatabaseName = fmt.Errorf("no database name") + ErrNilConfig = fmt.Errorf("no config") +) + +type Config struct { + MigrationsTable string + DatabaseName string + LockTable string + ForceLock bool + StatementTimeout time.Duration +} + +type Gbase8s struct { + conn *sql.Conn + db *sql.DB + isLocked atomic.Bool + config *Config +} + +func WithConnection(ctx context.Context, conn *sql.Conn, config *Config) (*Gbase8s, error) { + if config == nil { + return nil, ErrNilConfig + } + + if err := conn.PingContext(ctx); err != nil { + return nil, err + } + + gx := &Gbase8s{ + conn: conn, + db: nil, + config: config, + } + + if config.DatabaseName == "" { + query := `SELECT DBINFO('dbname') FROM systables WHERE tabid = 1` + var databaseName sql.NullString + if err := conn.QueryRowContext(ctx, query).Scan(&databaseName); err != nil { + return nil, &database.Error{OrigErr: err, Query: []byte(query)} + } + + if len(databaseName.String) == 0 { + return nil, ErrNoDatabaseName + } + + config.DatabaseName = databaseName.String + } + + if len(config.MigrationsTable) == 0 { + config.MigrationsTable = DefaultMigrationsTable + } + + if len(config.LockTable) == 0 { + config.LockTable = DefaultLockTable + } + + if err := gx.ensureLockTable(); err != nil { + return nil, err + } + + if err := gx.ensureVersionTable(); err != nil { + return nil, err + } + + return gx, nil +} + +func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { + ctx := context.Background() + + if err := instance.Ping(); err != nil { + return nil, err + } + + conn, err := instance.Conn(ctx) + if err != nil { + return nil, err + } + + gx, err := WithConnection(ctx, conn, config) + if err != nil { + return nil, err + } + + gx.db = instance + + return gx, nil +} + +func (g *Gbase8s) Open(dns string) (database.Driver, error) { + gurl, err := url.Parse(dns) + if err != nil { + return nil, err + } + + db, err := sql.Open("gbase8s", gurl.String()) + if err != nil { + return nil, err + } + + // migrationsTable := gurl.Query().Get("x-migrations-table") + // if len(migrationsTable) == 0 { + // migrationsTable = DefaultMigrationsTable + // } + + // lockTable := gurl.Query().Get("x-lock-table") + // if len(lockTable) == 0 { + // lockTable = DefaultLockTable + // } + + // forceLockQuery := gurl.Query().Get("x-force-lock") + // forceLock, err := strconv.ParseBool(forceLockQuery) + // if err != nil { + // forceLock = false + // } + + // statementTimeoutQuery := gurl.Query().Get("x-statement-timeout") + // statementTimeout, err := strconv.Atoi(statementTimeoutQuery) + // if err != nil { + // statementTimeout = 0 + // } + + migrationsTable := DefaultMigrationsTable + lockTable := DefaultLockTable + forceLock := false + statementTimeout := 0 + + gx, err := WithInstance(db, &Config{ + DatabaseName: gurl.Path, + MigrationsTable: migrationsTable, + LockTable: lockTable, + ForceLock: forceLock, + StatementTimeout: time.Duration(statementTimeout) * time.Millisecond, + }) + if err != nil { + return nil, err + } + + return gx, nil +} + +func (g *Gbase8s) Close() error { + connErr := g.conn.Close() + var dbErr error + if g.db != nil { + dbErr = g.db.Close() + } + + if connErr != nil || dbErr != nil { + return fmt.Errorf("conn: %v, db: %v", connErr, dbErr) + } + return nil +} + +func (g *Gbase8s) Lock() error { + return database.CasRestoreOnErr(&g.isLocked, false, true, database.ErrLocked, func() error { + tx, err := g.conn.BeginTx(context.Background(), nil) + if err != nil { + return err + } + defer func() { + if err != nil { + _ = tx.Rollback() + } else { + err = tx.Commit() + } + }() + + aid, err := database.GenerateAdvisoryLockId(g.config.DatabaseName) + if err != nil { + return err + } + + query := "SELECT lock_id FROM " + g.config.LockTable + " WHERE lock_id = ?" + rows, err := tx.QueryContext(context.Background(), query, aid) + if err != nil { + return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)} + } + defer rows.Close() + + if rows.Next() { + if !g.config.ForceLock { + return database.ErrLocked + } + query = "DELETE FROM " + g.config.LockTable + " WHERE lock_id = ?" + if _, err := tx.ExecContext(context.Background(), query, aid); err != nil { + return database.Error{OrigErr: err, Err: "failed to force release lock", Query: []byte(query)} + } + } + + query = "INSERT INTO " + g.config.LockTable + " (lock_id) VALUES (?)" + if _, err := tx.ExecContext(context.Background(), query, aid); err != nil { + return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)} + } + + return nil + }) +} + +func (g *Gbase8s) Unlock() error { + return database.CasRestoreOnErr(&g.isLocked, true, false, database.ErrNotLocked, func() error { + aid, err := database.GenerateAdvisoryLockId(g.config.DatabaseName) + if err != nil { + return err + } + + query := "DELETE FROM " + g.config.LockTable + " WHERE lock_id = ?" + if _, err := g.conn.ExecContext(context.Background(), query, aid); err != nil { + if strings.Contains(err.Error(), "ERROR: -206: 42000") { + // ERROR: -206: 42000 is "Table Not Exists Error" in Gbase8s + // when the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema + return nil + } + return &database.Error{OrigErr: err, Query: []byte(query)} + } + return nil + }) +} + +func (g *Gbase8s) Run(migration io.Reader) error { + migr, err := io.ReadAll(migration) + if err != nil { + return err + } + + ctx := context.Background() + if g.config.StatementTimeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, g.config.StatementTimeout) + defer cancel() + } + + query := string(migr[:]) + if _, err := g.conn.ExecContext(ctx, query); err != nil { + return database.Error{OrigErr: err, Err: "migration failed", Query: migr} + } + + return nil +} + +func (g *Gbase8s) SetVersion(version int, dirty bool) error { + tx, err := g.conn.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + return &database.Error{OrigErr: err, Err: "transaction start failed"} + } + + query := "DELETE FROM " + g.config.MigrationsTable + if _, err := tx.ExecContext(context.Background(), query); err != nil { + if errRollback := tx.Rollback(); errRollback != nil { + err = multierror.Append(err, errRollback) + } + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + if version >= 0 || (version == database.NilVersion && dirty) { + query := "INSERT INTO " + g.config.MigrationsTable + "(version, dirty) VALUES (?, ?)" + if _, err := tx.ExecContext(context.Background(), query, version, dirty); err != nil { + if errRollback := tx.Rollback(); errRollback != nil { + err = multierror.Append(err, errRollback) + } + return &database.Error{OrigErr: err, Query: []byte(query)} + } + } + + if err := tx.Commit(); err != nil { + return &database.Error{OrigErr: err, Err: "transaction commit failed"} + } + + return nil +} + +func (g *Gbase8s) Version() (version int, dirty bool, err error) { + query := "SELECT FIRST 1 version, dirty FROM " + g.config.MigrationsTable + err = g.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty) + if err != nil { + return database.NilVersion, false, nil + } + return version, dirty, nil +} + +func (g *Gbase8s) Drop() (err error) { + query := "SELECT tabname FROM systables WHERE tabid > 1000 AND tabtype = 'T'" + rows, err := g.conn.QueryContext(context.Background(), query) + if err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + defer rows.Close() + + var tables []string + for rows.Next() { + var table string + if err := rows.Scan(&table); err != nil { + return err + } + tables = append(tables, table) + } + + for _, tbl := range tables { + if _, err := g.conn.ExecContext(context.Background(), fmt.Sprintf("DROP TABLE IF EXISTS %s", tbl)); err != nil { + return err + } + } + return nil +} + +func (g *Gbase8s) ensureVersionTable() (err error) { + if err = g.Lock(); err != nil { + return err + } + defer func() { + if unlockErr := g.Unlock(); unlockErr != nil { + err = multierror.Append(err, unlockErr) + } + }() + + query := `CREATE TABLE IF NOT EXISTS "` + g.config.MigrationsTable + `" (version INT NOT NULL PRIMARY KEY, dirty SMALLINT NOT NULL)` + if _, err = g.conn.ExecContext(context.Background(), query); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + return nil +} + +func (g *Gbase8s) ensureLockTable() error { + query := `CREATE TABLE IF NOT EXISTS "` + g.config.LockTable + `" (lock_id INT NOT NULL PRIMARY KEY)` + if _, err := g.conn.ExecContext(context.Background(), query); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + return nil +} diff --git a/database/gbase8s/gbase8s_test.go b/database/gbase8s/gbase8s_test.go new file mode 100644 index 000000000..c33402674 --- /dev/null +++ b/database/gbase8s/gbase8s_test.go @@ -0,0 +1,190 @@ +package gbase8s + +import ( + "context" + "database/sql" + "database/sql/driver" + "fmt" + "log" + "testing" + + _ "gitee.com/GBase8s/go-gci" + "github.com/dhui/dktest" + "github.com/golang-migrate/migrate/v4" + dt "github.com/golang-migrate/migrate/v4/database/testing" + "github.com/golang-migrate/migrate/v4/dktesting" + _ "github.com/golang-migrate/migrate/v4/source/file" +) + +const ( + defaultPort = 9088 + userName = "gbasedbt" + userPwd = "GBase123" + dbName = "testdb" + gbaseServer = "gbase01" +) + +var ( + opts = dktest.Options{ + Env: map[string]string{"USERPASS": userPwd, "SERVERNAME": gbaseServer}, + PortRequired: true, ReadyFunc: isReady, + } + specs = []dktesting.ContainerSpec{ + {ImageName: "liaosnet/gbase8s:v8.8_3633x11_csdk_arm64", Options: opts}, + } +) + +func gbase8sConnectionString(host, port string) string { + return fmt.Sprintf("gbase8s://%s:%s@%s:%s/%s?GBASEDBTSERVER=%s&GCI_FACTORY=4&PROTOCOL=onsoctcp&delimident=1&sqlmode=oracle", + userName, userPwd, host, port, dbName, gbaseServer) +} + +func isReady(ctx context.Context, c dktest.ContainerInfo) bool { + ip, port, err := c.Port(defaultPort) + if err != nil { + return false + } + + db, err := sql.Open("gbase8s", gbase8sConnectionString(ip, port)) + if err != nil { + log.Println("open error", err) + return false + } + defer func() { + if err := db.Close(); err != nil { + log.Println("close error:", err) + } + }() + + if err = db.PingContext(ctx); err != nil { + switch err { + case driver.ErrBadConn: + return false + default: + fmt.Println(err) + } + return false + } + + return true +} + +func Test(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.Port(defaultPort) + if err != nil { + t.Fatal(err) + } + p := &Gbase8s{} + d, err := p.Open(gbase8sConnectionString(ip, port)) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + dt.Test(t, d, []byte("SELECT 1")) + }) +} + +func TestMigrate(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.Port(defaultPort) + if err != nil { + t.Fatal(err) + } + p := &Gbase8s{} + d, err := p.Open(gbase8sConnectionString(ip, port)) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "c11", d) + if err != nil { + t.Fatal(err) + } + dt.TestMigrate(t, m) + }) +} + +func TestVersion(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + expectedVersion := 1 + ip, port, err := c.Port(defaultPort) + if err != nil { + t.Fatal(err) + } + p := &Gbase8s{} + d, err := p.Open(gbase8sConnectionString(ip, port)) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + err = d.SetVersion(expectedVersion, false) + if err != nil { + t.Fatal(err) + } + version, _, err := d.Version() + if err != nil { + t.Fatal(err) + } + if version != expectedVersion { + t.Fatal("Version mismatch") + } + }) +} + +func TestDrop(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.Port(defaultPort) + if err != nil { + t.Fatal(err) + } + p := &Gbase8s{} + d, err := p.Open(gbase8sConnectionString(ip, port)) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + err = d.Drop() + if err != nil { + t.Fatal(err) + } + }) +} + +// func TestCustomQuery(t *testing.T) { +// dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { +// ip, port, err := c.Port(defaultPort) +// if err != nil { +// t.Fatal(err) +// } +// p := &Gbase8s{} +// // x-migrations-table= +// // x-lock-table= +// // x-force-lock= +// // x-statement-timeout +// _, err = p.Open(gbase8sConnectionString(ip, port) + "&x-migrations-table=mt&x-lock-table=lockt") +// if err != nil { +// t.Fatal(err) +// } +// }) +// } diff --git a/database/gbase8s/gbase8s_test.go.local b/database/gbase8s/gbase8s_test.go.local new file mode 100644 index 000000000..a5195a6f6 --- /dev/null +++ b/database/gbase8s/gbase8s_test.go.local @@ -0,0 +1,103 @@ +package gbase8s + +import ( + "testing" + + _ "gitee.com/GBase8s/go-gci" + "github.com/golang-migrate/migrate/v4" + dt "github.com/golang-migrate/migrate/v4/database/testing" + _ "github.com/golang-migrate/migrate/v4/source/file" +) + +var gbaseDNS = "gbase8s://username:userpasswd@ip:port/dbname?GBASEDBTSERVER=gbaseserver&GCI_FACTORY=4&PROTOCOL=onsoctcp&delimident=1&sqlmode=oracle" + +func Test(t *testing.T) { + p := &Gbase8s{} + d, err := p.Open(gbaseDNS) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + dt.Test(t, d, []byte("SELECT 1")) +} + +func TestMigrate(t *testing.T) { + p := &Gbase8s{} + d, err := p.Open(gbaseDNS) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "c11", d) + if err != nil { + t.Fatal(err) + } + dt.TestMigrate(t, m) +} + +func TestVersion(t *testing.T) { + expectedVersion := 1 + p := &Gbase8s{} + d, err := p.Open(gbaseDNS) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + err = d.SetVersion(expectedVersion, false) + if err != nil { + t.Fatal(err) + } + + version, _, err := d.Version() + if err != nil { + t.Fatal(err) + } + + if version != expectedVersion { + t.Fatal("Version mismatch") + } +} + +func TestDrop(t *testing.T) { + p := &Gbase8s{} + d, err := p.Open(gbaseDNS) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + err = d.Drop() + if err != nil { + t.Fatal(err) + } +} + +// func TestCustomQuery(t *testing.T) { +// p := &Gbase8s{} +// // x-migrations-table= +// // x-lock-table= +// // x-force-lock= +// // x-statement-timeout +// _, err := p.Open(gbaseDNS + "&x-migrations-table=mt&x-lock-table=lockt") +// if err != nil { +// t.Fatal(err) +// } +// } diff --git a/go.mod b/go.mod index 3c20151f2..0d3f3978f 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.23.0 require ( cloud.google.com/go/spanner v1.56.0 cloud.google.com/go/storage v1.38.0 + gitee.com/GBase8s/go-gci v0.1.1 github.com/Azure/go-autorest/autorest/adal v0.9.16 github.com/ClickHouse/clickhouse-go v1.4.3 github.com/aws/aws-sdk-go v1.49.6 diff --git a/go.sum b/go.sum index e30a51a2a..d2e27e9fb 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.38.0 h1:Az68ZRGlnNTpIBbLjSMIV2BDcwwXYlRlQzis0llkpJg= cloud.google.com/go/storage v1.38.0/go.mod h1:tlUADB0mAb9BgYls9lq+8MGkfzOXuLrnHXlpHmvFJoY= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +gitee.com/GBase8s/go-gci v0.1.1 h1:EHpIuDK4RrEcyh3UPRciEIDLYbHDRBq0agDCZu5HYfQ= +gitee.com/GBase8s/go-gci v0.1.1/go.mod h1:Si40KT5RWmuoTOv2CaSD+h6Iw7Ck16EUjgfXtTQwDUY= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= github.com/99designs/keyring v1.2.1 h1:tYLp1ULvO7i3fI5vE21ReQuj99QFSs7lGm0xWyJo87o= diff --git a/internal/cli/build_gbase8s.go b/internal/cli/build_gbase8s.go new file mode 100644 index 000000000..d24190600 --- /dev/null +++ b/internal/cli/build_gbase8s.go @@ -0,0 +1,7 @@ +//go:build gbase8s + +package cli + +import ( + _ "github.com/golang-migrate/migrate/v4/database/gbase8s" +)