Skip to content

Commit

Permalink
to #246 feat: set server to read-only when backup.
Browse files Browse the repository at this point in the history
  • Loading branch information
TianyuZhang1214 committed Dec 11, 2024
1 parent 6a58f09 commit c155b9d
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 10 deletions.
16 changes: 16 additions & 0 deletions backend/connpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
stdsql "database/sql"
"errors"
"fmt"
"strings"
"sync"

Expand Down Expand Up @@ -176,3 +177,18 @@ func (p *ConnectionPool) Close() error {
}
return errors.Join(lastErr, p.DB.Close())
}

func (p *ConnectionPool) ResetAndStart(catalog string, connector *duckdb.Connector, db *stdsql.DB) error {
err := p.Close()
if err != nil {
return fmt.Errorf("failed to close connection pool: %w", err)
}

p.conns.Clear()
p.txns.Clear()
p.catalog = catalog
p.DB = db
p.connector = connector

return nil
}
29 changes: 29 additions & 0 deletions catalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type DatabaseProvider struct {
storage *stdsql.DB
catalogName string
dataDir string
dsn string
externalProcedureRegistry sql.ExternalStoredProcedureRegistry
}

Expand All @@ -32,6 +33,8 @@ var _ sql.MutableDatabaseProvider = (*DatabaseProvider)(nil)
var _ sql.ExternalStoredProcedureProvider = (*DatabaseProvider)(nil)
var _ configuration.DataDirProvider = (*DatabaseProvider)(nil)

const readOnlySuffix = "?access_mode=read_only"

func NewInMemoryDBProvider() *DatabaseProvider {
prov, err := NewDBProvider(".", "")
if err != nil {
Expand Down Expand Up @@ -114,6 +117,7 @@ func NewDBProvider(dataDir, dbFile string) (*DatabaseProvider, error) {
storage: storage,
catalogName: name,
dataDir: dataDir,
dsn: dsn,
externalProcedureRegistry: sql.NewExternalStoredProcedureRegistry(), // This has no effect, just to satisfy the upper layer interface
}, nil
}
Expand Down Expand Up @@ -245,3 +249,28 @@ func (prov *DatabaseProvider) DropDatabase(ctx *sql.Context, name string) error

return nil
}

func (prov *DatabaseProvider) Restart(readOnly bool) error {
prov.mu.Lock()
defer prov.mu.Unlock()

err := prov.Close()
if err != nil {
return err
}

dsn := prov.dsn
if readOnly {
dsn += readOnlySuffix
}

connector, err := duckdb.NewConnector(dsn, nil)
if err != nil {
return err
}
storage := stdsql.OpenDB(connector)
prov.connector = connector
prov.storage = storage

return nil
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func main() {
}

pgServer, err := pgserver.NewServer(
provider, pool,
environment.GetAddress(), environment.GetPostgresPort(),
func() *sql.Context {
session := backend.NewSession(memory.NewSession(sql.NewBaseSession(), provider), provider, pool)
Expand Down
44 changes: 39 additions & 5 deletions pgserver/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,35 @@ func (h *ConnectionHandler) executeBackup(backupConfig *BackupConfig) (string, e
return "", fmt.Errorf("failed to create context for query: %w", err)
}

if err := stopReplication(sqlCtx); err != nil {
if err := stopAllReplication(sqlCtx); err != nil {
return "", fmt.Errorf("failed to stop replication: %w", err)
}

if err := doCheckpoint(sqlCtx); err != nil {
return "", fmt.Errorf("failed to do checkpoint: %w", err)
}

return backupConfig.StorageConfig.UploadLocalFile(environment.GetDataDirectory(), environment.GetDbFileName(),
err = h.restartServer(true)
if err != nil {
return "", err
}

msg, err := backupConfig.StorageConfig.UploadLocalFile(environment.GetDataDirectory(), environment.GetDbFileName(),
backupConfig.RemotePath)
if err != nil {
return "", err
}

err = h.restartServer(false)
if err != nil {
return "", fmt.Errorf("backup finished: %s, but failed to restart server: %w", msg, err)
}

if err = startAllReplication(sqlCtx); err != nil {
return "", fmt.Errorf("backup finished: %s, but failed to start replication: %w", msg, err)
}

return msg, nil
}

func doCheckpoint(sqlCtx *sql.Context) error {
Expand All @@ -104,7 +123,7 @@ func doCheckpoint(sqlCtx *sql.Context) error {
return nil
}

func stopReplication(sqlCtx *sql.Context) error {
func stopAllReplication(sqlCtx *sql.Context) error {
err := logrepl.UpdateAllSubscriptionStatus(sqlCtx, false)
if err != nil {
return err
Expand All @@ -113,6 +132,21 @@ func stopReplication(sqlCtx *sql.Context) error {
return logrepl.CommitAndUpdate(sqlCtx)
}

// TODO(neo.zty): add content.
func stopServer() {
func startAllReplication(sqlCtx *sql.Context) error {
err := logrepl.UpdateAllSubscriptionStatus(sqlCtx, true)
if err != nil {
return err
}

return logrepl.CommitAndUpdate(sqlCtx)
}

func (h *ConnectionHandler) restartServer(readOnly bool) error {
provider := h.server.Provider
err := provider.Restart(readOnly)
if err != nil {
return err
}

return h.server.ConnPool.ResetAndStart(provider.CatalogName(), provider.Connector(), provider.Storage())
}
12 changes: 7 additions & 5 deletions pgserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ package pgserver

import (
"fmt"

"github.com/apecloud/myduckserver/backend"
"github.com/apecloud/myduckserver/catalog"
"github.com/dolthub/go-mysql-server/server"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/mysql"
)

type Server struct {
Listener *Listener

Listener *Listener
Provider *catalog.DatabaseProvider
ConnPool *backend.ConnectionPool
NewInternalCtx func() *sql.Context
}

func NewServer(host string, port int, newCtx func() *sql.Context, options ...ListenerOpt) (*Server, error) {
func NewServer(provider *catalog.DatabaseProvider, connPool *backend.ConnectionPool, host string, port int, newCtx func() *sql.Context, options ...ListenerOpt) (*Server, error) {
addr := fmt.Sprintf("%s:%d", host, port)
l, err := server.NewListener("tcp", addr, "")
if err != nil {
Expand All @@ -31,7 +33,7 @@ func NewServer(host string, port int, newCtx func() *sql.Context, options ...Lis
if err != nil {
return nil, err
}
return &Server{Listener: listener, NewInternalCtx: newCtx}, nil
return &Server{Listener: listener, Provider: provider, ConnPool: connPool, NewInternalCtx: newCtx}, nil
}

func (s *Server) Start() {
Expand Down
1 change: 1 addition & 0 deletions pgtest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func CreateTestServer(t *testing.T, port int) (ctx context.Context, pgServer *pg
var connID atomic.Uint32

pgServer, err = pgserver.NewServer(
provider, pool,
"127.0.0.1", port,
func() *sql.Context {
session := backend.NewSession(memory.NewSession(sql.NewBaseSession(), provider), provider, pool)
Expand Down

0 comments on commit c155b9d

Please sign in to comment.