Skip to content

Commit

Permalink
Seperate integer pool (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
majst01 authored Sep 24, 2020
1 parent 248d0da commit 9ad3ea4
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 69 deletions.
118 changes: 82 additions & 36 deletions cmd/metal-api/internal/datastore/integer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,60 @@ import (
r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

// IntegerPoolType defines the name of the IntegerPool
type IntegerPoolType string

func (p IntegerPoolType) String() string {
return string(p)
}

const (
// VRFIntegerPool defines the name of the pool for VRFs
// this also defines the name of the tables
// FIXME, must be renamed to vrfpool later
VRFIntegerPool IntegerPoolType = "integerpool"
// ASNIntegerPool defines the name of the pool for ASNs
ASNIntegerPool IntegerPoolType = "asnpool"
)

var (
// IntegerPoolRangeMin the minimum integer to get from the pool
IntegerPoolRangeMin = uint(1)
// IntegerPoolRangeMax the maximum integer to get from the pool
IntegerPoolRangeMax = uint(131072)
// VRFPoolRangeMin the minimum integer to get from the vrf pool
VRFPoolRangeMin = uint(1)
// VRFPoolRangeMax the maximum integer to get from the vrf pool
VRFPoolRangeMax = uint(131072)
// ASNPoolRangeMin the minimum integer to get from the asn pool
ASNPoolRangeMin = uint(1)
// ASNPoolRangeMax the maximum integer to get from the asn pool
ASNPoolRangeMax = uint(131072)
)

// IntegerPool manages unique integers
type IntegerPool struct {
tablename string
min uint
max uint
term *r.Term
session r.QueryExecutor
}

type integer struct {
ID uint `rethinkdb:"id" json:"id"`
}

// Integerinfo contains information on the integer pool.
type Integerinfo struct {
// integerinfo contains information on the integer pool.
type integerinfo struct {
IsInitialized bool `rethinkdb:"isInitialized" json:"isInitialized"`
}

// GetIntegerPool returns a named integerpool if already created
func (rs *RethinkStore) GetIntegerPool(pool IntegerPoolType) (*IntegerPool, error) {
ip, ok := rs.integerPools[pool]
if !ok {
return nil, fmt.Errorf("no integerpool for %s created", pool)
}
return ip, nil
}

// initIntegerPool initializes a pool to acquire unique integers from.
// the acquired integers are used from the network service for defining the:
// - vrf name
Expand Down Expand Up @@ -51,72 +89,81 @@ type Integerinfo struct {
// - releasing the integer is fast
// - you do not have gaps (because you can give the integers back to the pool)
// - everything can be done atomically, so there are no race conditions
func (rs *RethinkStore) initIntegerPool() error {

var result Integerinfo
err := rs.findEntityByID(rs.integerInfoTable(), &result, "integerpool")
func (rs *RethinkStore) initIntegerPool(pool IntegerPoolType, min, max uint) (*IntegerPool, error) {
var result integerinfo
tablename := pool.String()
err := rs.findEntityByID(rs.integerInfoTable(tablename), &result, tablename)
if err != nil {
if !metal.IsNotFound(err) {
return err
return nil, err
}
}

ip := &IntegerPool{
tablename: tablename,
min: min,
max: max,
session: rs.session,
term: rs.integerTable(tablename),
}
rs.integerPools[pool] = ip
rs.SugaredLogger.Infow("pool info", "table", tablename, "info", result)
if result.IsInitialized {
return nil
return ip, nil
}

rs.SugaredLogger.Infow("Initializing integer pool", "RangeMin", IntegerPoolRangeMin, "RangeMax", IntegerPoolRangeMax)
intRange := makeRange(IntegerPoolRangeMin, IntegerPoolRangeMax)
_, err = rs.integerTable().Insert(intRange).RunWrite(rs.session, r.RunOpts{ArrayLimit: IntegerPoolRangeMax})
rs.SugaredLogger.Infow("Initializing integer pool", "for", tablename, "RangeMin", min, "RangeMax", max)
intRange := makeRange(min, max)
_, err = rs.integerTable(tablename).Insert(intRange).RunWrite(rs.session, r.RunOpts{ArrayLimit: max})
if err != nil {
return err
return nil, err
}
_, err = rs.integerInfoTable().Insert(map[string]interface{}{"id": "integerpool", "IsInitialized": true}).RunWrite(rs.session)
return err
_, err = rs.integerInfoTable(tablename).Insert(map[string]interface{}{"id": tablename, "IsInitialized": true}).RunWrite(rs.session)
return ip, err
}

// AcquireRandomUniqueInteger returns a random unique integer from the pool.
func (rs *RethinkStore) AcquireRandomUniqueInteger() (uint, error) {
t := rs.integerTable().Limit(1)
return rs.genericAcquire(&t)
func (ip *IntegerPool) AcquireRandomUniqueInteger() (uint, error) {
t := ip.term.Limit(1)
return ip.genericAcquire(&t)
}

// AcquireUniqueInteger returns a unique integer from the pool.
func (rs *RethinkStore) AcquireUniqueInteger(value uint) (uint, error) {
err := verifyRange(value)
func (ip *IntegerPool) AcquireUniqueInteger(value uint) (uint, error) {
err := ip.verifyRange(value)
if err != nil {
return 0, err
}
t := rs.integerTable().Get(value)
return rs.genericAcquire(&t)
t := ip.term.Get(value)
return ip.genericAcquire(&t)
}

// ReleaseUniqueInteger returns a unique integer to the pool.
func (rs *RethinkStore) ReleaseUniqueInteger(id uint) error {
err := verifyRange(id)
func (ip *IntegerPool) ReleaseUniqueInteger(id uint) error {
err := ip.verifyRange(id)
if err != nil {
return err
}

i := integer{
ID: id,
}
_, err = rs.integerTable().Insert(i, r.InsertOpts{Conflict: "replace"}).RunWrite(rs.session)
_, err = ip.term.Insert(i, r.InsertOpts{Conflict: "replace"}).RunWrite(ip.session)
if err != nil {
return err
}

return nil
}

func (rs *RethinkStore) genericAcquire(term *r.Term) (uint, error) {
res, err := term.Delete(r.DeleteOpts{ReturnChanges: true}).RunWrite(rs.session)
func (ip *IntegerPool) genericAcquire(term *r.Term) (uint, error) {
res, err := term.Delete(r.DeleteOpts{ReturnChanges: true}).RunWrite(ip.session)
if err != nil {
return 0, err
}

if len(res.Changes) == 0 {
res, err := rs.integerTable().Count().Run(rs.session)
res, err := ip.term.Count().Run(ip.session)
if err != nil {
return 0, err
}
Expand All @@ -128,9 +175,8 @@ func (rs *RethinkStore) genericAcquire(term *r.Term) (uint, error) {

if count <= 0 {
return 0, metal.Internal(fmt.Errorf("acquisition of a value failed for exhausted pool"), "")
} else {
return 0, metal.Conflict("integer is already acquired by another")
}
return 0, metal.Conflict("integer is already acquired by another")
}

result := uint(res.Changes[0].OldValue.(map[string]interface{})["id"].(float64))
Expand All @@ -147,9 +193,9 @@ func makeRange(min, max uint) []integer {
return a
}

func verifyRange(value uint) error {
if value < IntegerPoolRangeMin || value > IntegerPoolRangeMax {
return fmt.Errorf("value '%d' is outside of the allowed range '%d - %d'", value, IntegerPoolRangeMin, IntegerPoolRangeMax)
func (ip *IntegerPool) verifyRange(value uint) error {
if value < ip.min || value > ip.max {
return fmt.Errorf("value '%d' is outside of the allowed range '%d - %d'", value, ip.min, ip.max)
}
return nil
}
44 changes: 27 additions & 17 deletions cmd/metal-api/internal/datastore/integer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,27 @@ func TestRethinkStore_ReleaseUniqueInteger(t *testing.T) {
{
name: "verify validation of input fails",
value: 524288,
err: fmt.Errorf("value '524288' is outside of the allowed range '%d - %d'", IntegerPoolRangeMin, IntegerPoolRangeMax),
err: fmt.Errorf("value '524288' is outside of the allowed range '%d - %d'", VRFPoolRangeMin, VRFPoolRangeMax),
requiresMock: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rs, mock := InitMockDB()
ip, err := rs.GetIntegerPool(VRFIntegerPool)
assert.NoError(t, err)
if tt.requiresMock {
if tt.err != nil {
mock.On(r.DB("mockdb").Table("integerpool").Insert(integer{ID: tt.value}, r.InsertOpts{Conflict: "replace"})).Return(nil, tt.err)
mock.On(r.DB("mockdb").Table(ip.tablename).Insert(integer{ID: tt.value}, r.InsertOpts{Conflict: "replace"})).Return(nil, tt.err)
} else {
mock.On(r.DB("mockdb").Table("integerpool").Insert(integer{ID: tt.value}, r.InsertOpts{Conflict: "replace"})).Return(r.
WriteResponse{Changes: []r.ChangeResponse{r.ChangeResponse{OldValue: map[string]interface{}{"id": float64(
mock.On(r.DB("mockdb").Table(ip.tablename).Insert(integer{ID: tt.value}, r.InsertOpts{Conflict: "replace"})).Return(r.
WriteResponse{Changes: []r.ChangeResponse{{OldValue: map[string]interface{}{"id": float64(
tt.value)}}}}, tt.err)
}
}

got := rs.ReleaseUniqueInteger(tt.value)
got := ip.ReleaseUniqueInteger(tt.value)
if tt.err != nil {
assert.EqualError(t, got, tt.err.Error())
} else {
Expand All @@ -110,13 +112,15 @@ func TestRethinkStore_ReleaseUniqueInteger(t *testing.T) {

func TestRethinkStore_AcquireRandomUniqueInteger(t *testing.T) {
rs, mock := InitMockDB()
changes := []r.ChangeResponse{r.ChangeResponse{OldValue: map[string]interface{}{"id": float64(IntegerPoolRangeMin)}}}
mock.On(r.DB("mockdb").Table("integerpool").Limit(1).Delete(r.
ip, err := rs.GetIntegerPool(VRFIntegerPool)
assert.NoError(t, err)
changes := []r.ChangeResponse{{OldValue: map[string]interface{}{"id": float64(VRFPoolRangeMin)}}}
mock.On(r.DB("mockdb").Table(ip.tablename).Limit(1).Delete(r.
DeleteOpts{ReturnChanges: true})).Return(r.WriteResponse{Changes: changes}, nil)

got, err := rs.AcquireRandomUniqueInteger()
got, err := ip.AcquireRandomUniqueInteger()
assert.NoError(t, err)
assert.EqualValues(t, IntegerPoolRangeMin, got)
assert.EqualValues(t, VRFPoolRangeMin, got)

mock.AssertExpectations(t)
}
Expand All @@ -137,22 +141,25 @@ func TestRethinkStore_AcquireUniqueInteger(t *testing.T) {
{
name: "verify validation of input fails",
value: 524288,
err: fmt.Errorf("value '524288' is outside of the allowed range '%d - %d'", IntegerPoolRangeMin, IntegerPoolRangeMax),
err: fmt.Errorf("value '524288' is outside of the allowed range '%d - %d'", VRFPoolRangeMin, VRFPoolRangeMax),
requiresMock: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rs, mock := InitMockDB()
ip, err := rs.GetIntegerPool(VRFIntegerPool)
assert.NoError(t, err)

if tt.requiresMock {
changes := []r.ChangeResponse{r.ChangeResponse{OldValue: map[string]interface{}{"id": float64(
changes := []r.ChangeResponse{{OldValue: map[string]interface{}{"id": float64(
tt.value)}}}
mock.On(r.DB("mockdb").Table("integerpool").Get(tt.value).Delete(r.
mock.On(r.DB("mockdb").Table(ip.tablename).Get(tt.value).Delete(r.
DeleteOpts{ReturnChanges: true})).Return(r.WriteResponse{Changes: changes}, tt.err)
}

got, err := rs.AcquireUniqueInteger(tt.value)
got, err := ip.AcquireUniqueInteger(tt.value)
if tt.err != nil {
assert.EqualError(t, err, tt.err.Error())
} else {
Expand Down Expand Up @@ -209,21 +216,24 @@ func TestRethinkStore_genericAcquire(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rs, mock := InitMockDB()
term := rs.integerTable().Get(tt.value)
ip, err := rs.GetIntegerPool(VRFIntegerPool)
assert.NoError(t, err)

term := rs.integerTable(ip.tablename).Get(tt.value)
if tt.requiresMock {
var changes []r.ChangeResponse
if tt.tableChanges {
changes = []r.ChangeResponse{r.ChangeResponse{OldValue: map[string]interface{}{"id": float64(
changes = []r.ChangeResponse{{OldValue: map[string]interface{}{"id": float64(
tt.value)}}}
}
mock.On(term.Delete(r.DeleteOpts{ReturnChanges: true})).Return(r.WriteResponse{Changes: changes},
tt.runWriteErr)
if tt.requiresCountMock {
mock.On(rs.integerTable().Count()).Return(int64(0), nil)
mock.On(rs.integerTable(ip.tablename).Count()).Return(int64(0), nil)
}
}

got, err := rs.genericAcquire(&term)
got, err := ip.genericAcquire(&term)
if tt.expectedErr != nil {
assert.EqualError(t, err, tt.expectedErr.Error())
} else {
Expand Down
38 changes: 28 additions & 10 deletions cmd/metal-api/internal/datastore/rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (

var (
tables = []string{"image", "size", "partition", "machine", "switch", "wait", "event", "network", "ip",
"integerpool", "integerpoolinfo"}
VRFIntegerPool.String(), VRFIntegerPool.String() + "info",
ASNIntegerPool.String(), ASNIntegerPool.String() + "info"}
)

// A RethinkStore is the database access layer for rethinkdb.
Expand All @@ -24,10 +25,11 @@ type RethinkStore struct {
dbsession *r.Session
database *r.Term

dbname string
dbuser string
dbpass string
dbhost string
dbname string
dbuser string
dbpass string
dbhost string
integerPools map[IntegerPoolType]*IntegerPool
}

// New creates a new rethink store.
Expand All @@ -38,6 +40,7 @@ func New(log *zap.Logger, dbhost string, dbname string, dbuser string, dbpass st
dbname: dbname,
dbuser: dbuser,
dbpass: dbpass,
integerPools: make(map[IntegerPoolType]*IntegerPool),
}
}

Expand Down Expand Up @@ -68,6 +71,14 @@ func (rs *RethinkStore) initializeTables(opts r.TableCreateOpts) error {
db := rs.db()

err := multi(rs.session,
// rename old integerpool to vrfpool
// FIXME enable and remove once migrated
// db.TableList().Contains("integerpool").Do(func(r r.Term) r.Term {
// return db.Table("integerpool").Config().Update(map[string]interface{}{"name": VRFIntegerPoolName})
// }),
// db.TableList().Contains("integerpoolinfo").Do(func(r r.Term) r.Term {
// return db.Table("integerpoolinfo").Config().Update(map[string]interface{}{"name": VRFIntegerPoolName + "info"})
// }),
// create our tables
r.Expr(tables).Difference(db.TableList()).ForEach(func(r r.Term) r.Term {
return db.TableCreate(r, opts)
Expand All @@ -81,10 +92,17 @@ func (rs *RethinkStore) initializeTables(opts r.TableCreateOpts) error {
return err
}

err = rs.initIntegerPool()
vrfPool, err := rs.initIntegerPool(VRFIntegerPool, VRFPoolRangeMin, VRFPoolRangeMax)
if err != nil {
return err
}
rs.integerPools[VRFIntegerPool] = vrfPool

asnPool, err := rs.initIntegerPool(ASNIntegerPool, ASNPoolRangeMin, ASNPoolRangeMax)
if err != nil {
return err
}
rs.integerPools[ASNIntegerPool] = asnPool

return nil
}
Expand Down Expand Up @@ -121,12 +139,12 @@ func (rs *RethinkStore) ipTable() *r.Term {
res := r.DB(rs.dbname).Table("ip")
return &res
}
func (rs *RethinkStore) integerTable() *r.Term {
res := r.DB(rs.dbname).Table("integerpool")
func (rs *RethinkStore) integerTable(tablename string) *r.Term {
res := r.DB(rs.dbname).Table(tablename)
return &res
}
func (rs *RethinkStore) integerInfoTable() *r.Term {
res := r.DB(rs.dbname).Table("integerpoolinfo")
func (rs *RethinkStore) integerInfoTable(tablename string) *r.Term {
res := r.DB(rs.dbname).Table(tablename + "info")
return &res
}
func (rs *RethinkStore) db() *r.Term {
Expand Down
Loading

0 comments on commit 9ad3ea4

Please sign in to comment.