Skip to content

Commit

Permalink
Migrate ipam to use the grpc service (#522)
Browse files Browse the repository at this point in the history
  • Loading branch information
majst01 authored Jul 3, 2024
1 parent a01e40a commit 6028f51
Show file tree
Hide file tree
Showing 40 changed files with 604 additions and 1,077 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
cache: false

- name: Lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v6
with:
args: --build-tags integration -p bugs -p unused -D protogetter --timeout=5m

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release-drafter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: release-drafter/release-drafter@v5
- uses: release-drafter/release-drafter@v6
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
14 changes: 9 additions & 5 deletions cmd/metal-api/internal/datastore/integer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package datastore
import (
"context"
"log/slog"
"os"
"sync"
"time"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"github.com/metal-stack/metal-api/test"
Expand All @@ -25,7 +25,9 @@ func TestRethinkStore_AcquireRandomUniqueIntegerIntegration(t *testing.T) {
_ = container.Terminate(context.Background())
}()

rs := New(slog.Default(), c.IP+":"+c.Port, c.DB, c.User, c.Password)
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))

rs := New(log, c.IP+":"+c.Port, c.DB, c.User, c.Password)
rs.VRFPoolRangeMin = 10000
rs.VRFPoolRangeMax = 10010
rs.ASNPoolRangeMin = 10000
Expand All @@ -49,8 +51,9 @@ func TestRethinkStore_AcquireUniqueIntegerTwiceIntegration(t *testing.T) {
defer func() {
_ = container.Terminate(context.Background())
}()
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))

rs := New(slog.Default(), c.IP+":"+c.Port, c.DB, c.User, c.Password)
rs := New(log, c.IP+":"+c.Port, c.DB, c.User, c.Password)
rs.VRFPoolRangeMin = 10000
rs.VRFPoolRangeMax = 10010
rs.ASNPoolRangeMin = 10000
Expand All @@ -77,7 +80,9 @@ func TestRethinkStore_AcquireUniqueIntegerPoolExhaustionIntegration(t *testing.T
_ = container.Terminate(context.Background())
}()

rs := New(slog.Default(), c.IP+":"+c.Port, c.DB, c.User, c.Password)
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))

rs := New(log, c.IP+":"+c.Port, c.DB, c.User, c.Password)
rs.VRFPoolRangeMin = 10000
rs.VRFPoolRangeMax = 10010
rs.ASNPoolRangeMin = 10000
Expand All @@ -101,7 +106,6 @@ func TestRethinkStore_AcquireUniqueIntegerPoolExhaustionIntegration(t *testing.T
}
assert.GreaterOrEqual(t, got, uint(rs.VRFPoolRangeMin))
assert.LessOrEqual(t, got, uint(rs.VRFPoolRangeMax))
t.Logf("acquired a vrf %d at: %s", got, time.Now())
}()
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/metal-api/internal/datastore/machine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package datastore
import (
"context"
"log/slog"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -947,7 +948,7 @@ func TestRethinkStore_UpdateMachine(t *testing.T) {
func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) {

var (
root = slog.Default()
root = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
wg sync.WaitGroup
size = metal.Size{Base: metal.Base{ID: "1"}}
count int
Expand Down Expand Up @@ -1023,7 +1024,7 @@ func Test_FindWaitingMachine_NoConcurrentModificationErrors(t *testing.T) {
continue
}

log.Info("waiting machine found")
log.Debug("waiting machine found")

newMachine := *machine
newMachine.PreAllocated = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package migrations_integration
import (
"context"
"log/slog"
"os"
"time"

"github.com/google/go-cmp/cmp"
Expand All @@ -28,7 +29,9 @@ func Test_Migration(t *testing.T) {
_ = container.Terminate(context.Background())
}()

rs := datastore.New(slog.Default(), c.IP+":"+c.Port, c.DB, c.User, c.Password)
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))

rs := datastore.New(log, c.IP+":"+c.Port, c.DB, c.User, c.Password)
rs.VRFPoolRangeMin = 10000
rs.VRFPoolRangeMax = 10010
rs.ASNPoolRangeMin = 10000
Expand Down
82 changes: 59 additions & 23 deletions cmd/metal-api/internal/datastore/network.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package datastore

import (
"errors"
"fmt"
"net/netip"
"strconv"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/utils"
r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

Expand All @@ -24,8 +26,28 @@ type NetworkSearchQuery struct {
Labels map[string]string `json:"labels" optional:"true"`
}

func (p *NetworkSearchQuery) Validate() error {
var errs []error
for _, prefix := range p.Prefixes {
_, err := netip.ParsePrefix(prefix)
if err != nil {
errs = append(errs, err)
}
}
for _, prefix := range p.DestinationPrefixes {
_, err := netip.ParsePrefix(prefix)
if err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return errors.Join(errs...)
}

// GenerateTerm generates the project search query term.
func (p *NetworkSearchQuery) generateTerm(rs *RethinkStore) *r.Term {
func (p *NetworkSearchQuery) generateTerm(rs *RethinkStore) (*r.Term, error) {
q := *rs.networkTable()

if p.ID != nil {
Expand Down Expand Up @@ -91,42 +113,48 @@ func (p *NetworkSearchQuery) generateTerm(rs *RethinkStore) *r.Term {
}

for _, prefix := range p.Prefixes {
ip, length := utils.SplitCIDR(prefix)
pfx, err := netip.ParsePrefix(prefix)
if err != nil {
return nil, fmt.Errorf("unable to parse prefix %w", err)
}
ip := pfx.Addr()
length := pfx.Bits()

q = q.Filter(func(row r.Term) r.Term {
return row.Field("prefixes").Map(func(p r.Term) r.Term {
return p.Field("ip")
}).Contains(r.Expr(ip))
}).Contains(r.Expr(ip.String()))
})

if length != nil {
q = q.Filter(func(row r.Term) r.Term {
return row.Field("prefixes").Map(func(p r.Term) r.Term {
return p.Field("length")
}).Contains(r.Expr(strconv.Itoa(*length)))
})
}
q = q.Filter(func(row r.Term) r.Term {
return row.Field("prefixes").Map(func(p r.Term) r.Term {
return p.Field("length")
}).Contains(r.Expr(strconv.Itoa(length)))
})
}

for _, destPrefix := range p.DestinationPrefixes {
ip, length := utils.SplitCIDR(destPrefix)
pfx, err := netip.ParsePrefix(destPrefix)
if err != nil {
return nil, fmt.Errorf("unable to parse prefix %w", err)
}
ip := pfx.Addr()
length := pfx.Bits()

q = q.Filter(func(row r.Term) r.Term {
return row.Field("destinationprefixes").Map(func(dp r.Term) r.Term {
return dp.Field("ip")
}).Contains(r.Expr(ip))
}).Contains(r.Expr(ip.String()))
})

if length != nil {
q = q.Filter(func(row r.Term) r.Term {
return row.Field("destinationprefixes").Map(func(dp r.Term) r.Term {
return dp.Field("length")
}).Contains(r.Expr(strconv.Itoa(*length)))
})
}
q = q.Filter(func(row r.Term) r.Term {
return row.Field("destinationprefixes").Map(func(dp r.Term) r.Term {
return dp.Field("length")
}).Contains(r.Expr(strconv.Itoa(length)))
})
}

return &q
return &q, nil
}

// FindNetworkByID returns an network of a given id.
Expand All @@ -141,12 +169,20 @@ func (rs *RethinkStore) FindNetworkByID(id string) (*metal.Network, error) {

// FindNetwork returns a machine by the given query, fails if there is no record or multiple records found.
func (rs *RethinkStore) FindNetwork(q *NetworkSearchQuery, n *metal.Network) error {
return rs.findEntity(q.generateTerm(rs), &n)
term, err := q.generateTerm(rs)
if err != nil {
return err
}
return rs.findEntity(term, &n)
}

// SearchNetworks returns the networks that match the given properties
func (rs *RethinkStore) SearchNetworks(q *NetworkSearchQuery, ns *metal.Networks) error {
return rs.searchEntities(q.generateTerm(rs), ns)
term, err := q.generateTerm(rs)
if err != nil {
return err
}
return rs.searchEntities(term, ns)
}

// ListNetworks returns all networks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func startRethinkInitialized() (container testcontainers.Container, ds *RethinkS
panic(err)
}

rs := New(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})), c.IP+":"+c.Port, c.DB, c.User, c.Password)
rs := New(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})), c.IP+":"+c.Port, c.DB, c.User, c.Password)

rs.VRFPoolRangeMin = 10000
rs.VRFPoolRangeMax = 10010
Expand Down
3 changes: 2 additions & 1 deletion cmd/metal-api/internal/datastore/rethinkdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datastore

import (
"log/slog"
"os"
"testing"

"github.com/google/go-cmp/cmp"
Expand All @@ -11,7 +12,7 @@ import (
)

func TestNew(t *testing.T) {
logger := slog.Default()
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
type args struct {
log *slog.Logger
dbhost string
Expand Down
3 changes: 2 additions & 1 deletion cmd/metal-api/internal/datastore/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datastore

import (
"log/slog"
"os"
"testing"

r "gopkg.in/rethinkdb/rethinkdb-go.v6"
Expand All @@ -20,7 +21,7 @@ Return Values:
*/
func InitMockDB(t *testing.T) (*RethinkStore, *r.Mock) {
rs := New(
slog.Default(),
slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})),
"db-addr",
"mockdb",
"db-user",
Expand Down
3 changes: 2 additions & 1 deletion cmd/metal-api/internal/eventbus/nsq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventbus

import (
"log/slog"
"os"
"testing"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
Expand All @@ -15,7 +16,7 @@ func TestNewNSQ(t *testing.T) {
HTTPEndpoint: "rest",
}
publisher := bus.NewPublisher
logger := slog.Default()
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
actual := NewNSQ(cfg, logger, publisher)

assert.NotNil(t, actual)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"log/slog"
"math/rand/v2"
"os"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -220,7 +221,7 @@ func (t *test) startApiInstances(ds *datastore.RethinkStore) {
cfg := &ServerConfig{
Context: ctx,
Store: ds,
Logger: slog.Default(),
Logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})),
GrpcPort: 50005 + i,
TlsEnabled: false,
ResponseInterval: 2 * time.Millisecond,
Expand Down
9 changes: 7 additions & 2 deletions cmd/metal-api/internal/grpc/boot-service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"context"
"log/slog"
"os"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -33,6 +34,8 @@ func (p *emptyPublisher) CreateTopic(topic string) error {

func (p *emptyPublisher) Stop() {}
func TestBootService_Register(t *testing.T) {
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))

tests := []struct {
name string
uuid string
Expand Down Expand Up @@ -162,7 +165,7 @@ func TestBootService_Register(t *testing.T) {
}

bootService := &BootService{
log: slog.Default(),
log: log,
ds: ds,
ipmiSuperUser: metal.DisabledIPMISuperUser(),
publisher: &emptyPublisher{},
Expand Down Expand Up @@ -193,6 +196,8 @@ func TestBootService_Register(t *testing.T) {
}

func TestBootService_Report(t *testing.T) {
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))

tests := []struct {
name string
req *v1.BootServiceReportRequest
Expand Down Expand Up @@ -228,7 +233,7 @@ func TestBootService_Report(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
b := &BootService{
log: slog.Default(),
log: log,
ds: ds,
ipmiSuperUser: metal.DisabledIPMISuperUser(),
publisher: &emptyPublisher{},
Expand Down
4 changes: 3 additions & 1 deletion cmd/metal-api/internal/grpc/event-service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"context"
"log/slog"
"os"
"reflect"
"testing"

Expand All @@ -16,6 +17,7 @@ import (
func TestEventService_Send(t *testing.T) {
ds, mock := datastore.InitMockDB(t)
testdata.InitMockDBData(mock)
log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))

tests := []struct {
name string
Expand All @@ -36,7 +38,7 @@ func TestEventService_Send(t *testing.T) {
},
},
ds: ds,
log: slog.Default(),
log: log,
want: &v1.EventServiceSendResponse{
Events: uint64(1),
Failed: []string{},
Expand Down
Loading

0 comments on commit 6028f51

Please sign in to comment.