Skip to content

Commit

Permalink
Introduce rethinkdb migration mechanism. (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 authored Jan 20, 2021
1 parent 24fc80d commit f30c7f8
Show file tree
Hide file tree
Showing 10 changed files with 538 additions and 48 deletions.
5 changes: 5 additions & 0 deletions cmd/metal-api/internal/datastore/integer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func (rs *RethinkStore) initIntegerPool(pool IntegerPoolType, min, max uint) (*I
return ip, err
}

func (ip *IntegerPool) RenewSession(term *r.Term, session r.QueryExecutor) {
ip.term = term
ip.session = session
}

// AcquireRandomUniqueInteger returns a random unique integer from the pool.
func (ip *IntegerPool) AcquireRandomUniqueInteger() (uint, error) {
t := ip.term.Limit(1)
Expand Down
159 changes: 159 additions & 0 deletions cmd/metal-api/internal/datastore/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package datastore

import (
"fmt"
"sync"

"github.com/pkg/errors"
"go4.org/sort"
r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

// MigrateFunc is a function that contains database migration logic
type MigrateFunc func(db *r.Term, session r.QueryExecutor, rs *RethinkStore) error

// Migrations is a list of migrations
type Migrations []Migration

// Migration defines a database migration
type Migration struct {
Name string
Version int
Up MigrateFunc
}

// MigrationVersionEntry is a version entry in the migration database
type MigrationVersionEntry struct {
Version int `rethinkdb:"id"`
Name string `rethinkdb:"name"`
}

var (
migrations Migrations
migrationRegisterLock sync.Mutex
)

// MustRegisterMigration registers a migration and panics when a problem occurs
func MustRegisterMigration(m Migration) {
if m.Version < 1 {
panic(fmt.Sprintf("migrations should start from version number '1', but found version %q", m.Version))
}
migrationRegisterLock.Lock()
defer migrationRegisterLock.Unlock()
for _, migration := range migrations {
if migration.Version == m.Version {
panic(fmt.Sprintf("migration with version %d is defined multiple times", m.Version))
}
}
migrations = append(migrations, m)
}

// Between returns a sorted slice of migrations that are between the given current version
// and target version (target version contained). If target version is nil all newer versions
// than current are contained in the slice.
func (ms Migrations) Between(current int, target *int) (Migrations, error) {
var result Migrations
targetFound := false
for _, m := range ms {
if target != nil {
if m.Version > *target {
continue
}
if m.Version == *target {
targetFound = true
}
}

if m.Version <= current {
continue
}

result = append(result, m)
}

sort.Slice(result, func(i, j int) bool {
return result[i].Version < result[j].Version
})

if target != nil && !targetFound {
return nil, fmt.Errorf("target version not found")
}

return result, nil
}

// Migrate runs database migrations and puts the database into read only mode for demoted runtime users.
func (rs *RethinkStore) Migrate(targetVersion *int, dry bool) error {
_, err := rs.migrationTable().Insert(MigrationVersionEntry{Version: 0}, r.InsertOpts{
Conflict: "replace",
}).RunWrite(rs.session)
if err != nil {
return err
}

results, err := rs.migrationTable().Max().Run(rs.session)
if err != nil {
return err
}
defer results.Close()

var current MigrationVersionEntry
err = results.One(&current)
if err != nil {
return err
}

if targetVersion != nil && *targetVersion < current.Version {
return fmt.Errorf("target version (=%d) smaller than current version (=%d) and down migrations not supported", *targetVersion, current.Version)
}
ms, err := migrations.Between(current.Version, targetVersion)
if err != nil {
return err
}

if len(ms) == 0 {
rs.SugaredLogger.Infow("no database migration required", "current-version", current.Version)
return nil
}

rs.SugaredLogger.Infow("database migration required", "current-version", current.Version, "newer-versions", len(ms), "target-version", ms[len(ms)-1].Version)

if dry {
for _, m := range ms {
rs.SugaredLogger.Infow("database migration dry run", "version", m.Version, "name", m.Name)
}
return nil
}

rs.SugaredLogger.Infow("setting demoted runtime user to read only", "user", DemotedUser)
_, err = rs.db().Grant(DemotedUser, map[string]interface{}{"read": true, "write": false}).RunWrite(rs.session)
if err != nil {
return err
}
defer func() {
rs.SugaredLogger.Infow("removing read only", "user", DemotedUser)
_, err = rs.db().Grant(DemotedUser, map[string]interface{}{"read": true, "write": true}).RunWrite(rs.session)
if err != nil {
rs.SugaredLogger.Errorw("error giving back write permissions", "user", DemotedUser)
}
}()

for _, m := range ms {
rs.SugaredLogger.Infow("running database migration", "version", m.Version, "name", m.Name)
err = m.Up(rs.db(), rs.session, rs)
if err != nil {
return errors.Wrap(err, "error running database migration")
}

_, err := rs.migrationTable().Insert(MigrationVersionEntry{Version: m.Version, Name: m.Name}, r.InsertOpts{
Conflict: "replace",
}).RunWrite(rs.session)
if err != nil {
return errors.Wrap(err, "error updating database migration version")
}
}

rs.SugaredLogger.Infow("database migration succeeded")

return nil
}
167 changes: 167 additions & 0 deletions cmd/metal-api/internal/datastore/migrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package datastore

import (
"reflect"
"testing"
)

func TestMigrations_Between(t *testing.T) {
type args struct {
current int
target *int
}
tests := []struct {
name string
ms Migrations
args args
want Migrations
wantErr bool
}{
{
name: "no migrations is fine",
ms: []Migration{},
args: args{
current: 0,
},
want: nil,
wantErr: false,
},
{
name: "get all migrations from 0, sorted",
ms: []Migration{
{
Name: "migration 4",
Version: 4,
},
{
Name: "migration 2",
Version: 2,
},
{
Name: "migration 1",
Version: 1,
},
},
args: args{
current: 0,
},
want: []Migration{
{
Name: "migration 1",
Version: 1,
},
{
Name: "migration 2",
Version: 2,
},
{
Name: "migration 4",
Version: 4,
},
},
wantErr: false,
},
{
name: "get all migrations from 1, sorted",
ms: []Migration{
{
Name: "migration 4",
Version: 4,
},
{
Name: "migration 2",
Version: 2,
},
{
Name: "migration 1",
Version: 1,
},
},
args: args{
current: 1,
},
want: []Migration{
{
Name: "migration 2",
Version: 2,
},
{
Name: "migration 4",
Version: 4,
},
},
wantErr: false,
},
{
name: "get migrations up to target version, sorted",
ms: []Migration{
{
Name: "migration 4",
Version: 4,
},
{
Name: "migration 2",
Version: 2,
},
{
Name: "migration 1",
Version: 1,
},
},
args: args{
current: 0,
target: intPtr(2),
},
want: []Migration{
{
Name: "migration 1",
Version: 1,
},
{
Name: "migration 2",
Version: 2,
},
},
wantErr: false,
},
{
name: "error on unknown target version",
ms: []Migration{
{
Name: "migration 4",
Version: 4,
},
{
Name: "migration 2",
Version: 2,
},
{
Name: "migration 1",
Version: 1,
},
},
args: args{
current: 0,
target: intPtr(3),
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.ms.Between(tt.args.current, tt.args.target)
if (err != nil) != tt.wantErr {
t.Errorf("Migrations.Between() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Migrations.Between() = %v, want %v", got, tt.want)
}
})
}
}

func intPtr(i int) *int {
return &i
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package migrations

import (
r "gopkg.in/rethinkdb/rethinkdb-go.v6"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/datastore"
)

func init() {
datastore.MustRegisterMigration(datastore.Migration{
Name: "remove wait table (not used anymore since grpc wait server was introduced)",
Version: 1,
Up: func(db *r.Term, session r.QueryExecutor, rs *datastore.RethinkStore) error {
res, err := db.TableList().Contains("wait").Run(session)
if err != nil {
return err
}
defer res.Close()

var exists bool
err = res.One(&exists)
if err != nil {
return err
}

if exists {
_, err = db.TableDrop("wait").RunWrite(session)
}
return err
},
})
}
21 changes: 21 additions & 0 deletions cmd/metal-api/internal/datastore/migrations/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Package migrations contains migration functions for migrating the RethinkDB.
//
// Migrating RethinkDB is a bit different than compared to regular SQL databases because
// clients define the schema and not the server.
//
// Currently, migrations are only intended to be run *after* the rollout of the new clients.
// This prevents older clients to write their old schema into the database after the migration
// was applied. This approach allows us to apply zero-downtime migrations for most of the
// use-cases we have seen in the past.
//
// There are probably scenarios where it makes sense to migrate *before* instance
// rollout and stop the instances before the migration (downtime migration) but for now
// this use-case has not been implemented and it possibly requires more difficult
// deployment orchestration to apply a migration.
//
// We also do not support down migrations for the time being because it also makes
// things more complicated than they need to be.
//
// Please ensure that your migrations are idempotent (they need to work for existing and
// for fresh deployments). Check the state before modifying it.
package migrations
Loading

0 comments on commit f30c7f8

Please sign in to comment.