Skip to content

Commit

Permalink
Add data store connect opts. (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 authored Feb 5, 2021
1 parent a775138 commit 695a33b
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 148 deletions.
94 changes: 53 additions & 41 deletions cmd/metal-api/internal/datastore/integer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"go.uber.org/zap"

r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)
Expand Down Expand Up @@ -37,10 +38,11 @@ var (

// IntegerPool manages unique integers
type IntegerPool struct {
tablename string
poolType IntegerPoolType
min uint
max uint
term *r.Term
poolTable *r.Term
infoTable *r.Term
session r.QueryExecutor
}

Expand All @@ -50,16 +52,34 @@ type integer struct {

// integerinfo contains information on the integer pool.
type integerinfo struct {
IsInitialized bool `rethinkdb:"isInitialized" json:"isInitialized"`
ID string `rethinkdb:"id"`
IsInitialized bool `rethinkdb:"isInitialized" json:"isInitialized"`
}

// GetIntegerPool returns a named integerpool if already created
func (rs *RethinkStore) GetIntegerPool(pool IntegerPoolType) (*IntegerPool, error) {
ip, ok := rs.integerPools[pool]
if !ok {
return nil, fmt.Errorf("no integerpool for %s created", pool)
func (rs *RethinkStore) GetVRFPool() *IntegerPool {
return &IntegerPool{
poolType: VRFIntegerPool,
session: rs.session,
min: VRFPoolRangeMin,
max: VRFPoolRangeMax,
poolTable: rs.vrfTable(),
infoTable: rs.vrfInfoTable(),
}
}

func (rs *RethinkStore) GetASNPool() *IntegerPool {
return &IntegerPool{
poolType: ASNIntegerPool,
session: rs.session,
min: ASNPoolRangeMin,
max: ASNPoolRangeMax,
poolTable: rs.asnTable(),
infoTable: rs.asnInfoTable(),
}
return ip, nil
}

func (ip *IntegerPool) String() string {
return ip.poolType.String()
}

// initIntegerPool initializes a pool to acquire unique integers from.
Expand Down Expand Up @@ -93,47 +113,39 @@ func (rs *RethinkStore) GetIntegerPool(pool IntegerPoolType) (*IntegerPool, erro
// - releasing the integer is fast
// - you do not have gaps (because you can give the integers back to the pool)
// - everything can be done atomically, so there are no race conditions
func (rs *RethinkStore) initIntegerPool(pool IntegerPoolType, min, max uint) (*IntegerPool, error) {
var result integerinfo
tablename := pool.String()
err := rs.findEntityByID(rs.integerInfoTable(tablename), &result, tablename)
if err != nil {
if !metal.IsNotFound(err) {
return nil, err
}
func (ip *IntegerPool) initIntegerPool(log *zap.SugaredLogger) error {
var info integerinfo
err := ip.infoTable.ReadOne(&info, ip.session)
if err != nil && err != r.ErrEmptyResult {
return err
}

ip := &IntegerPool{
tablename: tablename,
min: min,
max: max,
session: rs.session,
term: rs.integerTable(tablename),
log.Infow("pool info", "id", ip.String(), "info", info)
if info.IsInitialized {
return nil
}
rs.integerPools[pool] = ip
rs.SugaredLogger.Infow("pool info", "table", tablename, "info", result)
if result.IsInitialized {
return ip, nil

log.Infow("initializing integer pool", "for", ip.String(), "RangeMin", ip.min, "RangeMax", ip.max)
intRange := makeRange(ip.min, ip.max)
_, err = ip.poolTable.Insert(intRange).RunWrite(ip.session, r.RunOpts{ArrayLimit: ip.max})
if err != nil {
return err
}

rs.SugaredLogger.Infow("Initializing integer pool", "for", tablename, "RangeMin", min, "RangeMax", max)
intRange := makeRange(min, max)
_, err = rs.integerTable(tablename).Insert(intRange).RunWrite(rs.session, r.RunOpts{ArrayLimit: max})
_, err = ip.infoTable.Insert(integerinfo{
ID: ip.String(),
IsInitialized: true,
}).RunWrite(ip.session)
if err != nil {
return nil, err
return err
}
_, err = rs.integerInfoTable(tablename).Insert(map[string]interface{}{"id": tablename, "IsInitialized": true}).RunWrite(rs.session)
return ip, err
}

func (ip *IntegerPool) RenewSession(term *r.Term, session r.QueryExecutor) {
ip.term = term
ip.session = session
return nil
}

// AcquireRandomUniqueInteger returns a random unique integer from the pool.
func (ip *IntegerPool) AcquireRandomUniqueInteger() (uint, error) {
t := ip.term.Limit(1)
t := ip.poolTable.Limit(1)
return ip.genericAcquire(&t)
}

Expand All @@ -143,7 +155,7 @@ func (ip *IntegerPool) AcquireUniqueInteger(value uint) (uint, error) {
if err != nil {
return 0, err
}
t := ip.term.Get(value)
t := ip.poolTable.Get(value)
return ip.genericAcquire(&t)
}

Expand All @@ -157,7 +169,7 @@ func (ip *IntegerPool) ReleaseUniqueInteger(id uint) error {
i := integer{
ID: id,
}
_, err = ip.term.Insert(i, r.InsertOpts{Conflict: "replace"}).RunWrite(ip.session)
_, err = ip.poolTable.Insert(i, r.InsertOpts{Conflict: "replace"}).RunWrite(ip.session)
if err != nil {
return err
}
Expand All @@ -172,7 +184,7 @@ func (ip *IntegerPool) genericAcquire(term *r.Term) (uint, error) {
}

if len(res.Changes) == 0 {
res, err := ip.term.Count().Run(ip.session)
res, err := ip.poolTable.Count().Run(ip.session)
if err != nil {
return 0, err
}
Expand Down
24 changes: 10 additions & 14 deletions cmd/metal-api/internal/datastore/integer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ func TestRethinkStore_ReleaseUniqueInteger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rs, mock := InitMockDB()
ip, err := rs.GetIntegerPool(VRFIntegerPool)
assert.NoError(t, err)
ip := rs.GetVRFPool()
if tt.requiresMock {
if tt.err != nil {
mock.On(r.DB("mockdb").Table(ip.tablename).Insert(integer{ID: tt.value}, r.InsertOpts{Conflict: "replace"})).Return(nil, tt.err)
mock.On(r.DB("mockdb").Table(ip.String()).Insert(integer{ID: tt.value}, r.InsertOpts{Conflict: "replace"})).Return(nil, tt.err)
} else {
mock.On(r.DB("mockdb").Table(ip.tablename).Insert(integer{ID: tt.value}, r.InsertOpts{Conflict: "replace"})).Return(r.
mock.On(r.DB("mockdb").Table(ip.String()).Insert(integer{ID: tt.value}, r.InsertOpts{Conflict: "replace"})).Return(r.
WriteResponse{Changes: []r.ChangeResponse{{OldValue: map[string]interface{}{"id": float64(
tt.value)}}}}, tt.err)
}
Expand All @@ -112,10 +111,9 @@ func TestRethinkStore_ReleaseUniqueInteger(t *testing.T) {

func TestRethinkStore_AcquireRandomUniqueInteger(t *testing.T) {
rs, mock := InitMockDB()
ip, err := rs.GetIntegerPool(VRFIntegerPool)
assert.NoError(t, err)
ip := rs.GetVRFPool()
changes := []r.ChangeResponse{{OldValue: map[string]interface{}{"id": float64(VRFPoolRangeMin)}}}
mock.On(r.DB("mockdb").Table(ip.tablename).Limit(1).Delete(r.
mock.On(r.DB("mockdb").Table(ip.String()).Limit(1).Delete(r.
DeleteOpts{ReturnChanges: true})).Return(r.WriteResponse{Changes: changes}, nil)

got, err := ip.AcquireRandomUniqueInteger()
Expand Down Expand Up @@ -149,13 +147,12 @@ func TestRethinkStore_AcquireUniqueInteger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rs, mock := InitMockDB()
ip, err := rs.GetIntegerPool(VRFIntegerPool)
assert.NoError(t, err)
ip := rs.GetVRFPool()

if tt.requiresMock {
changes := []r.ChangeResponse{{OldValue: map[string]interface{}{"id": float64(
tt.value)}}}
mock.On(r.DB("mockdb").Table(ip.tablename).Get(tt.value).Delete(r.
mock.On(r.DB("mockdb").Table(ip.String()).Get(tt.value).Delete(r.
DeleteOpts{ReturnChanges: true})).Return(r.WriteResponse{Changes: changes}, tt.err)
}

Expand Down Expand Up @@ -216,10 +213,9 @@ func TestRethinkStore_genericAcquire(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rs, mock := InitMockDB()
ip, err := rs.GetIntegerPool(VRFIntegerPool)
assert.NoError(t, err)
ip := rs.GetVRFPool()

term := rs.integerTable(ip.tablename).Get(tt.value)
term := ip.poolTable.Get(tt.value)
if tt.requiresMock {
var changes []r.ChangeResponse
if tt.tableChanges {
Expand All @@ -229,7 +225,7 @@ func TestRethinkStore_genericAcquire(t *testing.T) {
mock.On(term.Delete(r.DeleteOpts{ReturnChanges: true})).Return(r.WriteResponse{Changes: changes},
tt.runWriteErr)
if tt.requiresCountMock {
mock.On(rs.integerTable(ip.tablename).Count()).Return(int64(0), nil)
mock.On(ip.poolTable.Count()).Return(int64(0), nil)
}
}

Expand Down
42 changes: 22 additions & 20 deletions cmd/metal-api/internal/datastore/rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ type RethinkStore struct {
session r.QueryExecutor
dbsession *r.Session

dbname string
dbuser string
dbpass string
dbhost string
integerPools map[IntegerPoolType]*IntegerPool
dbname string
dbuser string
dbpass string
dbhost string
}

// New creates a new rethink store.
Expand All @@ -43,7 +42,6 @@ func New(log *zap.Logger, dbhost string, dbname string, dbuser string, dbpass st
dbname: dbname,
dbuser: dbuser,
dbpass: dbpass,
integerPools: make(map[IntegerPoolType]*IntegerPool),
}
}

Expand All @@ -66,13 +64,15 @@ func (rs *RethinkStore) Health() error {
)
}

// Initialize initializes the database, it should be called every time
// the application comes up before using the data store
// Initialize initializes the database, it should be called before serving the metal-api
// in order to ensure that tables, pools, permissions are properly initialized
func (rs *RethinkStore) Initialize() error {
return rs.initializeTables(r.TableCreateOpts{Shards: 1, Replicas: 1})
}

func (rs *RethinkStore) initializeTables(opts r.TableCreateOpts) error {
rs.Info("starting database init")

db := rs.db()

err := multi(rs.session,
Expand Down Expand Up @@ -119,17 +119,15 @@ func (rs *RethinkStore) initializeTables(opts r.TableCreateOpts) error {
}

// integer pools
vrfPool, err := rs.initIntegerPool(VRFIntegerPool, VRFPoolRangeMin, VRFPoolRangeMax)
err = rs.GetVRFPool().initIntegerPool(rs.SugaredLogger)
if err != nil {
return err
}
rs.integerPools[VRFIntegerPool] = vrfPool

asnPool, err := rs.initIntegerPool(ASNIntegerPool, ASNPoolRangeMin, ASNPoolRangeMax)
err = rs.GetASNPool().initIntegerPool(rs.SugaredLogger)
if err != nil {
return err
}
rs.integerPools[ASNIntegerPool] = asnPool

rs.Info("database init complete")

Expand Down Expand Up @@ -168,12 +166,20 @@ func (rs *RethinkStore) ipTable() *r.Term {
res := r.DB(rs.dbname).Table("ip")
return &res
}
func (rs *RethinkStore) integerTable(tablename string) *r.Term {
res := r.DB(rs.dbname).Table(tablename)
func (rs *RethinkStore) asnTable() *r.Term {
res := r.DB(rs.dbname).Table(ASNIntegerPool.String())
return &res
}
func (rs *RethinkStore) asnInfoTable() *r.Term {
res := r.DB(rs.dbname).Table(ASNIntegerPool.String() + "info")
return &res
}
func (rs *RethinkStore) integerInfoTable(tablename string) *r.Term {
res := r.DB(rs.dbname).Table(tablename + "info")
func (rs *RethinkStore) vrfTable() *r.Term {
res := r.DB(rs.dbname).Table(VRFIntegerPool.String())
return &res
}
func (rs *RethinkStore) vrfInfoTable() *r.Term {
res := r.DB(rs.dbname).Table(VRFIntegerPool.String() + "info")
return &res
}
func (rs *RethinkStore) migrationTable() *r.Term {
Expand Down Expand Up @@ -229,10 +235,6 @@ func (rs *RethinkStore) Demote() error {
rs.dbsession = retryConnect(rs.SugaredLogger, []string{rs.dbhost}, rs.dbname, DemotedUser, rs.dbpass)
rs.session = rs.dbsession

for name, pool := range rs.integerPools {
pool.RenewSession(rs.integerTable(name.String()), rs.session)
}

rs.Info("rethinkstore connected with demoted user")
return nil
}
Expand Down
1 change: 0 additions & 1 deletion cmd/metal-api/internal/datastore/rethinkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ var (
dbname: "dbname",
dbuser: "dbuser",
dbpass: "password",
integerPools: make(map[IntegerPoolType]*IntegerPool),
}
)

Expand Down
6 changes: 0 additions & 6 deletions cmd/metal-api/internal/datastore/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ func InitMockDB() (*RethinkStore, *r.Mock) {
"db-password",
)
mock := rs.Mock()
vrfterm := rs.integerTable(VRFIntegerPool.String())
asnterm := rs.integerTable(VRFIntegerPool.String())
vrfPool := IntegerPool{tablename: VRFIntegerPool.String(), min: VRFPoolRangeMin, max: VRFPoolRangeMax, term: vrfterm, session: rs.session}
asnPool := IntegerPool{tablename: ASNIntegerPool.String(), min: ASNPoolRangeMin, max: ASNPoolRangeMax, term: asnterm, session: rs.session}
rs.integerPools[VRFIntegerPool] = &vrfPool
rs.integerPools[ASNIntegerPool] = &asnPool
return rs, mock
}

Expand Down
12 changes: 2 additions & 10 deletions cmd/metal-api/internal/service/asn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ const (

// acquireASN fetches a unique integer by using the existing integer pool and adding to ASNBase
func acquireASN(ds *datastore.RethinkStore) (*uint32, error) {
asnPool, err := ds.GetIntegerPool(datastore.ASNIntegerPool)
if err != nil {
return nil, err
}
i, err := asnPool.AcquireRandomUniqueInteger()
i, err := ds.GetASNPool().AcquireRandomUniqueInteger()
if err != nil {
return nil, err
}
Expand All @@ -43,9 +39,5 @@ func releaseASN(ds *datastore.RethinkStore, asn uint32) error {
}
i := uint(asn - ASNBase)

asnPool, err := ds.GetIntegerPool(datastore.ASNIntegerPool)
if err != nil {
return err
}
return asnPool.ReleaseUniqueInteger(i)
return ds.GetASNPool().ReleaseUniqueInteger(i)
}
Loading

0 comments on commit 695a33b

Please sign in to comment.