diff --git a/Makefile b/Makefile index b25332452..e2ea94dfc 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ test-unit: .PHONY: test-integration test-integration: - go test -v -tags=integration -timeout 600s -p 1 ./... + go test -v -count=1 -tags=integration -timeout 600s -p 1 ./... .PHONY: check-diff check-diff: spec diff --git a/cmd/metal-api/internal/grpc/boot-service-wait.go b/cmd/metal-api/internal/grpc/boot-service-wait.go index 89fef64ca..42553c63a 100644 --- a/cmd/metal-api/internal/grpc/boot-service-wait.go +++ b/cmd/metal-api/internal/grpc/boot-service-wait.go @@ -94,9 +94,6 @@ func (b *BootService) Wait(req *v1.BootServiceWaitRequest, srv v1.BootService_Wa } func (b *BootService) initWaitEndpoint() error { - if b.publisher == nil || b.consumer == nil { - return nil - } channel := fmt.Sprintf("alloc-%s#ephemeral", uuid.NewString()) return b.consumer.With(bus.LogLevel(bus.Warning)). MustRegister(metal.TopicAllocation.Name, channel). diff --git a/cmd/metal-api/internal/grpc/boot-service-wait_integration_test.go b/cmd/metal-api/internal/grpc/boot-service-wait_integration_test.go index 238235256..9a1eb27b8 100644 --- a/cmd/metal-api/internal/grpc/boot-service-wait_integration_test.go +++ b/cmd/metal-api/internal/grpc/boot-service-wait_integration_test.go @@ -5,343 +5,345 @@ package grpc import ( "context" - "errors" "fmt" - "io" "log/slog" - "math/rand/v2" + "net" "os" + "slices" "strconv" - "sync" "testing" "time" + "github.com/avast/retry-go/v4" "github.com/metal-stack/metal-api/cmd/metal-api/internal/datastore" "github.com/metal-stack/metal-api/cmd/metal-api/internal/metal" v1 "github.com/metal-stack/metal-api/pkg/api/v1" + v1grpc "github.com/metal-stack/metal-api/pkg/grpc" + "github.com/metal-stack/metal-lib/bus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" - "github.com/undefinedlabs/go-mpatch" - - r "gopkg.in/rethinkdb/rethinkdb-go.v6" + integrationtest "github.com/metal-stack/metal-api/test" ) -type testCase int +type ( + test struct { + log *slog.Logger -const ( - happyPath testCase = iota - serverFailure - clientFailure -) + ss []*server + cc []*client -type client struct { - *grpc.ClientConn - cancel func() -} + t *testing.T -type server struct { - cancel context.CancelFunc - allocate chan string -} + ds *datastore.RethinkStore + publisher bus.Publisher + consumer *bus.Consumer -type test struct { - *testing.T - ss []*server - cc []*client + numberApiInstances int + numberMachineInstances int + numberAllocations int + } - numberApiInstances int - numberMachineInstances int - numberAllocations int - testCase testCase + client struct { + machineID string + conn *grpc.ClientConn + c v1.BootServiceClient + cancel func() + } - notReadyMachines *sync.WaitGroup - unallocatedMachines *sync.WaitGroup - mtx *sync.Mutex - allocations map[string]bool -} + server struct { + cfg *ServerConfig + cancel context.CancelFunc + allocate chan string + } +) func TestWaitServer(t *testing.T) { - var tt []*test - aa := []int{1, 10} - mm := [][]int{{10, 7}} - for _, a := range aa { - for _, m := range mm { - require.Positive(t, a) - require.Positive(t, m[0]) - require.Positive(t, m[1]) - require.GreaterOrEqual(t, m[0], m[1]) - tt = append(tt, &test{ - numberApiInstances: a, - numberMachineInstances: m[0], - numberAllocations: m[1], - }) - } - } - for _, test := range tt { - test.T = t - test.testCase = happyPath - test.run() - test.testCase = serverFailure - test.run() - test.testCase = clientFailure - test.run() + var ( + log = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) + ctx = context.Background() + ) + + // starting a rethinkdb rethinkContainer + rethinkContainer, c, err := integrationtest.StartRethink(t) + require.NoError(t, err) + defer func() { + _ = rethinkContainer.Terminate(ctx) + }() - } -} + ds := datastore.New(log, c.IP+":"+c.Port, c.DB, c.User, c.Password) + ds.VRFPoolRangeMax = 1000 + ds.ASNPoolRangeMax = 1000 -func (t *test) run() { - defer t.shutdown() + err = ds.Connect() + require.NoError(t, err) + err = ds.Initialize() + require.NoError(t, err) - time.Sleep(20 * time.Millisecond) + // starting nsqd container + nsqContainer, publisher, consumer := integrationtest.StartNsqd(t, slog.Default()) + require.NoError(t, err) + defer func() { + _ = nsqContainer.Terminate(ctx) + }() - t.notReadyMachines = new(sync.WaitGroup) - t.notReadyMachines.Add(t.numberMachineInstances) - t.unallocatedMachines = new(sync.WaitGroup) - t.unallocatedMachines.Add(t.numberAllocations) - t.mtx = new(sync.Mutex) - t.allocations = make(map[string]bool) + te := &test{ + log: log, + t: t, + ds: ds, + publisher: publisher, + consumer: consumer, + numberApiInstances: 3, + numberMachineInstances: 10, + numberAllocations: 7, + } - now := time.Now() - _, _ = mpatch.PatchMethod(time.Now, func() time.Time { - return now - }) + te.run(ctx) +} - wait := make(map[string]bool) +func (te *test) run(ctx context.Context) { + defer te.shutdown() - insertMock := func(w bool, id string) r.Term { - return r.DB("mockdb").Table("machine").Get(id).Replace(func(row r.Term) r.Term { - return r.Branch(row.Field("changed").Eq(r.MockAnything()), metal.Machine{ - Base: metal.Base{ID: id, Changed: now}, - Waiting: w, - }, r.MockAnything()) - }) - } - returnMock := func(w bool, id string) func() []interface{} { - return func() []interface{} { - t.mtx.Lock() - defer t.mtx.Unlock() - wait[id] = w - return []interface{}{r.WriteResponse{}} - } - } + var ( + allocations = make(chan string) + machines []string + ) - ds, mock := datastore.InitMockDB(t.T) - for i := range t.numberMachineInstances { - machineID := strconv.Itoa(i) - mock.On(r.DB("mockdb").Table("machine").Get(machineID)).Return(metal.Machine{Base: metal.Base{ID: machineID}}, nil) - mock.On(insertMock(true, machineID)).Return(returnMock(true, machineID), nil) - mock.On(insertMock(false, machineID)).Return(returnMock(false, machineID), nil) - } + te.startApiInstances(ctx) + te.startMachineInstances(ctx, allocations) - t.startApiInstances(ds) - t.startMachineInstances() - t.notReadyMachines.Wait() + for i := range te.numberAllocations { + client := te.cc[i%len(te.cc)] + te.t.Logf("sending nsq allocation event for machine %s", client.machineID) - require.Len(t, wait, t.numberMachineInstances) - for _, wait := range wait { - require.True(t, wait) - } + err := te.publisher.Publish(metal.TopicAllocation.Name, &metal.AllocationEvent{MachineID: client.machineID}) + require.NoError(te.t, err) - switch t.testCase { - case happyPath: - case serverFailure: - t.notReadyMachines.Add(t.numberMachineInstances) - t.stopApiInstances() - t.startApiInstances(ds) - t.notReadyMachines.Wait() - case clientFailure: - t.notReadyMachines.Add(t.numberMachineInstances) - t.stopMachineInstances() - t.startMachineInstances() - t.notReadyMachines.Wait() + machines = append(machines, client.machineID) } - require.Len(t, wait, t.numberMachineInstances) - for _, wait := range wait { - require.True(t, wait) - } + waitCtx, waitCancel := context.WithTimeout(ctx, 10*time.Second) + defer waitCancel() - t.allocateMachines() + for { + select { + case <-waitCtx.Done(): + te.t.Errorf("not all machines left wait for allocation loop in time") + case machineID := <-allocations: + require.Contains(te.t, machines, machineID, "unexpected machine left wait loop") - t.unallocatedMachines.Wait() + te.t.Logf("machine %q left wait for allocation loop", machineID) - require.Len(t, t.allocations, t.numberAllocations) - for _, allocated := range t.allocations { - require.True(t, allocated) - } + machines = slices.DeleteFunc(machines, func(id string) bool { + return id == machineID + }) + } - require.Len(t, wait, t.numberMachineInstances) - for key, wait := range wait { - require.Equal(t, !containsKey(t.allocations, key), wait) + if len(machines) == 0 { + te.t.Logf("all expected machines left wait for allocation loop") + break + } } -} -func containsKey(m map[string]bool, key string) bool { - for k := range m { - if k == key { - return true + ms, err := te.ds.ListMachines() + require.NoError(te.t, err) + + waiting := 0 + for _, m := range ms { + if m.Waiting { + waiting++ } } - return false -} -func (t *test) shutdown() { - t.stopMachineInstances() - t.stopApiInstances() -} + assert.Equal(te.t, te.numberMachineInstances-te.numberAllocations, waiting) + assert.Len(te.t, ms, te.numberMachineInstances) -func (t *test) stopApiInstances() { - defer func() { - t.ss = t.ss[:0] - }() - for _, s := range t.ss { - s.cancel() - time.Sleep(50 * time.Millisecond) - } + // TODO: we could use the remaining clients which are waiting for testing an "outage scenario", + // i.e. shutting down the grpc-servers, check that the clients do not give up and reconnect after + // bringing the servers back up (and vice versa). } -func (t *test) stopMachineInstances() { - defer func() { - t.cc = t.cc[:0] - }() - for _, c := range t.cc { - c.cancel() - _ = c.Close() - } -} +func (te *test) startApiInstances(ctx context.Context) { + var ( + timeoutCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + g, gCtx = errgroup.WithContext(timeoutCtx) + ) -func (t *test) startApiInstances(ds *datastore.RethinkStore) { - for i := range t.numberApiInstances { - ctx, cancel := context.WithCancel(context.Background()) - allocate := make(chan string) + defer cancel() - cfg := &ServerConfig{ - Context: ctx, - Store: ds, - Logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})), - GrpcPort: 50005 + i, - TlsEnabled: false, - ResponseInterval: 2 * time.Millisecond, - CheckInterval: 1 * time.Hour, + for range te.numberApiInstances { + instanceCtx, cancel := context.WithCancel(ctx) + allocate := make(chan string) - integrationTestAllocator: allocate, - } + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(te.t, err) - t.ss = append(t.ss, &server{ + server := &server{ cancel: cancel, allocate: allocate, - }) + cfg: &ServerConfig{ + Context: instanceCtx, + Store: te.ds, + Logger: te.log.WithGroup("grpc-server"), + Listener: listener, + Publisher: te.publisher, + Consumer: te.consumer, + TlsEnabled: false, + }, + } + + te.ss = append(te.ss, server) + go func() { - err := Run(cfg) - require.NoError(t, err) + addr := server.cfg.Listener.Addr().String() + + te.t.Logf("running grpc server on %s", addr) + + if err := Run(server.cfg); err != nil { + te.t.Logf("error running grpc server: %s", err) + } else { + te.t.Logf("grpc server on %s stopped successfully", addr) + } }() - } -} -func (t *test) startMachineInstances() { - kacp := keepalive.ClientParameters{ - Time: 5 * time.Millisecond, - Timeout: 20 * time.Millisecond, - PermitWithoutStream: true, + g.Go(func() error { + conn, err := server.newClient() + if err != nil { + return err + } + + conn.Connect() + defer conn.Close() + + return retry.Do(func() error { + state := conn.GetState() + if state == connectivity.Ready { + te.t.Logf("grpc server on %s is accepting client connections", server.cfg.Listener.Addr().String()) + return nil + } + return fmt.Errorf("client cannot connect, still in state %s", state) + }, retry.Context(gCtx)) + }) } - opts := []grpc.DialOption{ - grpc.WithKeepaliveParams(kacp), - grpc.WithTransportCredentials(insecure.NewCredentials()), + + if err := g.Wait(); err != nil { + te.t.Errorf("grpc servers did not come up: %s", err) } - for i := range t.numberMachineInstances { - machineID := strconv.Itoa(i) - // golangci-lint has an issue with math/rand/v2 - // here it provides sufficient randomness though because it's not used for cryptographic purposes - port := 50005 + rand.N(t.numberApiInstances) //nolint:gosec - ctx, cancel := context.WithCancel(context.Background()) - conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", port), opts...) - require.NoError(t, err) - t.cc = append(t.cc, &client{ - ClientConn: conn, - cancel: cancel, + + te.t.Logf("all %d grpc servers started successfully", len(te.ss)) +} + +func (te *test) startMachineInstances(ctx context.Context, allocations chan string) { + var ( + timeoutCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + g, gCtx = errgroup.WithContext(timeoutCtx) + ) + + defer cancel() + + for i := range te.numberMachineInstances { + var ( + machineID = strconv.Itoa(i) + server = te.ss[i%len(te.ss)] + ctx, cancel = context.WithCancel(ctx) + ) + + conn, err := server.newClient() + require.NoError(te.t, err) + + client := &client{ + machineID: machineID, + conn: conn, + c: v1.NewBootServiceClient(conn), + cancel: cancel, + } + + te.cc = append(te.cc, client) + + err = te.ds.CreateMachine(&metal.Machine{ + Base: metal.Base{ + ID: client.machineID, + }, }) + require.NoError(te.t, err) + + te.t.Logf("created machine %q in rethinkdb store", client.machineID) + go func() { - waitClient := v1.NewBootServiceClient(conn) - err := t.waitForAllocation(machineID, waitClient, ctx) + client.conn.Connect() + + err := v1grpc.WaitForAllocation(ctx, te.log.WithGroup("grpc-client").With("machine-id", client.machineID), client.c, client.machineID, 2*time.Second) if err != nil { return } - t.mtx.Lock() - t.allocations[machineID] = true - t.mtx.Unlock() - t.unallocatedMachines.Done() + + allocations <- machineID }() + + g.Go(func() error { + return retry.Do(func() error { + m, err := te.ds.FindMachineByID(client.machineID) + require.NoError(te.t, err) + + if m.Waiting { + return nil + } + + return fmt.Errorf("machine %s is not yet waiting", m.ID) + }, retry.Context(gCtx)) + }) } -} -func (t *test) waitForAllocation(machineID string, c v1.BootServiceClient, ctx context.Context) error { - req := &v1.BootServiceWaitRequest{ - MachineId: machineID, + if err := g.Wait(); err != nil { + te.t.Errorf("grpc clients did not come up: %s", err) } - for { - stream, err := c.Wait(ctx, req) - time.Sleep(5 * time.Millisecond) - if err != nil { - continue - } + te.t.Logf("all %d grpc clients are now waiting", len(te.cc)) +} - receivedResponse := false +func (te *test) shutdown() { + te.stopMachineInstances() + te.stopApiInstances() +} - for { - _, err := stream.Recv() - if errors.Is(err, io.EOF) { - if !receivedResponse { - break - } - return nil - } - if err != nil { - if !receivedResponse { - break - } - if t.testCase == clientFailure { - return err - } - break - } - if !receivedResponse { - receivedResponse = true - t.notReadyMachines.Done() - } - } +func (te *test) stopApiInstances() { + for _, s := range te.ss { + te.t.Logf("stopping grpc server on %s", s.cfg.Listener.Addr().String()) + s.cancel() } + te.ss = nil } -func (t *test) allocateMachines() { - var alreadyAllocated []string - for range t.numberAllocations { - machineID := t.selectMachine(alreadyAllocated) - alreadyAllocated = append(alreadyAllocated, machineID) - t.mtx.Lock() - t.allocations[machineID] = false - t.mtx.Unlock() - t.simulateNsqNotifyAllocated(machineID) +func (te *test) stopMachineInstances() { + for _, c := range te.cc { + te.t.Logf("stopping grpc client for machine %s", c.machineID) + c.cancel() + err := c.conn.Close() + require.NoError(te.t, err, "unable to shutdown grpc client conn") } + te.cc = nil } -func (t *test) selectMachine(except []string) string { - // golangci-lint has an issue with math/rand/v2 - // here it provides sufficient randomness though because it's not used for cryptographic purposes - machineID := strconv.Itoa(rand.N(t.numberMachineInstances)) //nolint:gosec - for _, id := range except { - if id == machineID { - return t.selectMachine(except) - } +func (s *server) newClient() (*grpc.ClientConn, error) { + kacp := keepalive.ClientParameters{ + Time: 1 * time.Second, + Timeout: 500 * time.Millisecond, + PermitWithoutStream: true, + } + opts := []grpc.DialOption{ + grpc.WithKeepaliveParams(kacp), + grpc.WithTransportCredentials(insecure.NewCredentials()), } - return machineID -} -func (t *test) simulateNsqNotifyAllocated(machineID string) { - for _, s := range t.ss { - s.allocate <- machineID + conn, err := grpc.NewClient(s.cfg.Listener.Addr().String(), opts...) + if err != nil { + return nil, err } + + return conn, nil } diff --git a/cmd/metal-api/internal/grpc/grpc-server.go b/cmd/metal-api/internal/grpc/grpc-server.go index 26b21dab6..bb865ed38 100644 --- a/cmd/metal-api/internal/grpc/grpc-server.go +++ b/cmd/metal-api/internal/grpc/grpc-server.go @@ -44,7 +44,7 @@ type ServerConfig struct { Consumer *bus.Consumer Store *datastore.RethinkStore Logger *slog.Logger - GrpcPort int + Listener net.Listener TlsEnabled bool CaCertFile string ServerCertFile string @@ -54,8 +54,6 @@ type ServerConfig struct { BMCSuperUserPasswordFile string Auditing auditing.Auditing IPMISuperUser metal.MachineIPMISuperUser - - integrationTestAllocator chan string } func Run(cfg *ServerConfig) error { @@ -65,6 +63,9 @@ func Run(cfg *ServerConfig) error { if cfg.CheckInterval <= 0 { cfg.CheckInterval = defaultCheckInterval } + if cfg.Publisher == nil || cfg.Consumer == nil { + return fmt.Errorf("nsq publisher and consumer must be specified") + } kaep := keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection @@ -164,21 +165,7 @@ func Run(cfg *ServerConfig) error { v1.RegisterEventServiceServer(grpcServer, eventService) v1.RegisterBootServiceServer(grpcServer, bootService) - // this is only for the integration test of this package - if cfg.integrationTestAllocator != nil { - go func() { - for { - machineID := <-cfg.integrationTestAllocator - bootService.handleAllocation(machineID) - } - }() - } - - addr := fmt.Sprintf(":%d", cfg.GrpcPort) - listener, err := net.Listen("tcp", addr) - if err != nil { - return err - } + listener := cfg.Listener if cfg.TlsEnabled { cert, err := os.ReadFile(cfg.ServerCertFile) diff --git a/cmd/metal-api/internal/service/integration_test.go b/cmd/metal-api/internal/service/integration_test.go index 10a1f16c8..d462d0664 100644 --- a/cmd/metal-api/internal/service/integration_test.go +++ b/cmd/metal-api/internal/service/integration_test.go @@ -10,7 +10,6 @@ import ( "net" "net/http" "net/http/httptest" - "os" "testing" "time" @@ -18,10 +17,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" - "github.com/testcontainers/testcontainers-go" - metalgrpc "github.com/metal-stack/metal-api/cmd/metal-api/internal/grpc" - "github.com/metal-stack/metal-api/test" "github.com/metal-stack/metal-lib/bus" "github.com/metal-stack/security" @@ -53,26 +49,14 @@ type testEnv struct { ds *datastore.RethinkStore privateSuperNetwork *v1.NetworkResponse privateNetwork *v1.NetworkResponse - rethinkContainer testcontainers.Container ctx context.Context + listener net.Listener } -func (te *testEnv) teardown() { - _ = te.rethinkContainer.Terminate(te.ctx) -} - -//nolint:deadcode -func createTestEnvironment(t *testing.T) testEnv { +func createTestEnvironment(t *testing.T, log *slog.Logger, ds *datastore.RethinkStore, publisher bus.Publisher, consumer *bus.Consumer) testEnv { ipamer := ipam.InitTestIpam(t) - rethinkContainer, c, err := test.StartRethink(t) - require.NoError(t, err) - log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) - ds := datastore.New(log, c.IP+":"+c.Port, c.DB, c.User, c.Password) - ds.VRFPoolRangeMax = 1000 - ds.ASNPoolRangeMax = 1000 - - err = ds.Connect() + err := ds.Connect() require.NoError(t, err) err = ds.Initialize() require.NoError(t, err) @@ -88,25 +72,29 @@ func createTestEnvironment(t *testing.T) testEnv { }}, nil) mdc := mdm.NewMock(psc, nil, nil, nil) + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + go func() { err := metalgrpc.Run(&metalgrpc.ServerConfig{ Context: context.Background(), Store: ds, - Publisher: NopPublisher{}, + Publisher: publisher, + Consumer: consumer, Logger: log, - GrpcPort: 50005, + Listener: listener, TlsEnabled: false, ResponseInterval: 2 * time.Millisecond, CheckInterval: 1 * time.Hour, }) if err != nil { - t.Fail() + t.Errorf("grpc server stopped unexpectedly: %s", err) } }() hma := security.NewHMACAuth(testUserDirectory.admin.Name, []byte{1, 2, 3}, security.WithUser(testUserDirectory.admin)) usergetter := security.NewCreds(security.WithHMAC(hma)) - machineService, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipamer, mdc, nil, usergetter, 0, nil, metal.DisabledIPMISuperUser()) + machineService, err := NewMachine(log, ds, publisher, bus.NewEndpoints(consumer, publisher), ipamer, mdc, nil, usergetter, 0, nil, metal.DisabledIPMISuperUser()) require.NoError(t, err) imageService := NewImage(log, ds) switchService := NewSwitch(log, ds) @@ -114,7 +102,7 @@ func createTestEnvironment(t *testing.T) testEnv { sizeImageConstraintService := NewSizeImageConstraint(log, ds) networkService := NewNetwork(log, ds, ipamer, mdc) partitionService := NewPartition(log, ds, &emptyPublisher{}) - ipService, err := NewIP(log, ds, bus.DirectEndpoints(), ipamer, mdc) + ipService, err := NewIP(log, ds, bus.NewEndpoints(consumer, publisher), ipamer, mdc) require.NoError(t, err) te := testEnv{ @@ -127,8 +115,8 @@ func createTestEnvironment(t *testing.T) testEnv { machineService: machineService, ipService: ipService, ds: ds, - rethinkContainer: rethinkContainer, ctx: context.TODO(), + listener: listener, } imageID := "test-image-1.0.0" @@ -394,7 +382,7 @@ func (te *testEnv) machineFree(t *testing.T, uuid string, response interface{}) return webRequestDelete(t, te.machineService, &testUserDirectory.admin, &emptyBody{}, "/v1/machine/"+uuid+"/free", response) } -func (te *testEnv) machineWait(uuid string) error { +func (te *testEnv) machineWait(listener net.Listener, uuid string) error { kacp := keepalive.ClientParameters{ Time: 5 * time.Millisecond, Timeout: 20 * time.Millisecond, @@ -405,12 +393,13 @@ func (te *testEnv) machineWait(uuid string) error { grpc.WithTransportCredentials(insecure.NewCredentials()), } - port := 50005 - conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", port), opts...) + conn, err := grpc.NewClient(listener.Addr().String(), opts...) if err != nil { return err } + conn.Connect() + isWaiting := make(chan bool) go func() { diff --git a/cmd/metal-api/internal/service/machine-service_allocation_test.go b/cmd/metal-api/internal/service/machine-service_allocation_test.go index a32a0314a..95c220eab 100644 --- a/cmd/metal-api/internal/service/machine-service_allocation_test.go +++ b/cmd/metal-api/internal/service/machine-service_allocation_test.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "log/slog" + "net" "net/http" "net/http/httptest" "os" @@ -20,7 +21,6 @@ import ( "google.golang.org/grpc/credentials/insecure" grpcv1 "github.com/metal-stack/metal-api/pkg/api/v1" - "github.com/testcontainers/testcontainers-go" "github.com/avast/retry-go/v4" "github.com/emicklei/go-restful/v3" @@ -52,18 +52,34 @@ var ( func TestMachineAllocationIntegration(t *testing.T) { machineCount := 30 + log := slog.Default() + + nsqContainer, publisher, consumer := test.StartNsqd(t, log) + rethinkContainer, cd, err := test.StartRethink(t) + require.NoError(t, err) - rethinkContainer, container := setupTestEnvironment(machineCount, t) defer func() { _ = rethinkContainer.Terminate(context.Background()) + _ = nsqContainer.Terminate(context.Background()) }() + rs := datastore.New(log, cd.IP+":"+cd.Port, cd.DB, cd.User, cd.Password) + rs.VRFPoolRangeMax = 1000 + rs.ASNPoolRangeMax = 1000 + + err = rs.Connect() + require.NoError(t, err) + + err = rs.Initialize() + require.NoError(t, err) + + webContainer, listener := setupTestEnvironment(machineCount, t, rs, publisher, consumer) + opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } - port := 50006 - conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", port), opts...) + conn, err := grpc.NewClient(listener.Addr().String(), opts...) require.NoError(t, err) c := grpcv1.NewBootServiceClient(conn) @@ -118,7 +134,7 @@ func TestMachineAllocationIntegration(t *testing.T) { err := retry.Do( func() error { var err2 error - ma, err2 = allocMachine(container, ar) + ma, err2 = allocMachine(webContainer, ar) if err2 != nil { t.Logf("machine allocation failed, retrying:%v", err2) return err2 @@ -170,7 +186,7 @@ func TestMachineAllocationIntegration(t *testing.T) { func() error { // TODO add switch config in testdata to have switch updates covered var err2 error - _, err2 = freeMachine(container, id) + _, err2 = freeMachine(webContainer, id) if err2 != nil { t.Logf("machine free failed, retrying:%v", err2) return err2 @@ -289,21 +305,9 @@ func createMachineRegisterRequest(i int) *grpcv1.BootServiceRegisterRequest { } } -func setupTestEnvironment(machineCount int, t *testing.T) (testcontainers.Container, *restful.Container) { +func setupTestEnvironment(machineCount int, t *testing.T, ds *datastore.RethinkStore, publisher bus.Publisher, consumer *bus.Consumer) (*restful.Container, net.Listener) { log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) - rethinkContainer, c, err := test.StartRethink(t) - require.NoError(t, err) - - rs := datastore.New(log, c.IP+":"+c.Port, c.DB, c.User, c.Password) - rs.VRFPoolRangeMax = 1000 - rs.ASNPoolRangeMax = 1000 - - err = rs.Connect() - require.NoError(t, err) - err = rs.Initialize() - require.NoError(t, err) - psc := &mdmv1mock.ProjectServiceClient{} psc.On("Get", testifymock.Anything, &mdmv1.ProjectGetRequest{Id: "pr1"}).Return(&mdmv1.ProjectResponse{Project: &mdmv1.Project{}}, nil) mdc := mdm.NewMock(psc, nil, nil, nil) @@ -331,31 +335,35 @@ func setupTestEnvironment(machineCount int, t *testing.T) (testcontainers.Contai metalIPAMer := ipam.New(ipamclient) - createTestdata(machineCount, rs, metalIPAMer, t) + createTestdata(machineCount, ds, metalIPAMer, t) + + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) go func() { err := metalgrpc.Run(&metalgrpc.ServerConfig{ Context: context.Background(), - Store: rs, - Publisher: NopPublisher{}, + Store: ds, + Publisher: publisher, + Consumer: consumer, + Listener: listener, Logger: log, - GrpcPort: 50006, TlsEnabled: false, ResponseInterval: 2 * time.Millisecond, CheckInterval: 1 * time.Hour, }) if err != nil { - t.Fail() + t.Errorf("grpc server shutdown unexpectedly: %s", err) } }() usergetter := security.NewCreds(security.WithHMAC(hma)) - ms, err := NewMachine(log, rs, &emptyPublisher{}, bus.DirectEndpoints(), metalIPAMer, mdc, nil, usergetter, 0, nil, metal.DisabledIPMISuperUser()) + ms, err := NewMachine(log, ds, publisher, bus.NewEndpoints(consumer, publisher), metalIPAMer, mdc, nil, usergetter, 0, nil, metal.DisabledIPMISuperUser()) require.NoError(t, err) container := restful.NewContainer().Add(ms) container.Filter(rest.UserAuth(usergetter, log)) - return rethinkContainer, container + return container, listener } func createTestdata(machineCount int, rs *datastore.RethinkStore, ipamer ipam.IPAMer, t *testing.T) { diff --git a/cmd/metal-api/internal/service/machine-service_integration_test.go b/cmd/metal-api/internal/service/machine-service_integration_test.go index 2fe907959..9533f9cf7 100644 --- a/cmd/metal-api/internal/service/machine-service_integration_test.go +++ b/cmd/metal-api/internal/service/machine-service_integration_test.go @@ -5,7 +5,6 @@ package service import ( "context" - "fmt" "log/slog" "net" "net/http" @@ -29,8 +28,22 @@ import ( ) func TestMachineAllocationIntegrationFullCycle(t *testing.T) { - te := createTestEnvironment(t) - defer te.teardown() + log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})) + + rethinkContainer, cd, err := test.StartRethink(t) + require.NoError(t, err) + nsqContainer, publisher, consumer := test.StartNsqd(t, log) + + defer func() { + _ = rethinkContainer.Terminate(context.Background()) + _ = nsqContainer.Terminate(context.Background()) + }() + + ds := datastore.New(log, cd.IP+":"+cd.Port, cd.DB, cd.User, cd.Password) + ds.VRFPoolRangeMax = 1000 + ds.ASNPoolRangeMax = 1000 + + te := createTestEnvironment(t, log, ds, publisher, consumer) // Register a machine mrr := &grpcv1.BootServiceRegisterRequest{ @@ -86,10 +99,11 @@ func TestMachineAllocationIntegrationFullCycle(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - port := 50005 - conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", port), opts...) + conn, err := grpc.NewClient(te.listener.Addr().String(), opts...) require.NoError(t, err) + conn.Connect() + c := grpcv1.NewBootServiceClient(conn) registeredMachine, err := c.Register(ctx, mrr) @@ -97,7 +111,7 @@ func TestMachineAllocationIntegrationFullCycle(t *testing.T) { require.NotNil(t, registeredMachine) assert.Len(t, mrr.Hardware.Nics, 2) - err = te.machineWait("test-uuid") + err = te.machineWait(te.listener, "test-uuid") require.NoError(t, err) // DB contains at least a machine which is allocatable @@ -137,7 +151,9 @@ func TestMachineAllocationIntegrationFullCycle(t *testing.T) { status = te.machineFree(t, "test-uuid", &v1.MachineResponse{}) require.Equal(t, http.StatusOK, status) - err = te.machineWait("test-uuid") + time.Sleep(1 * time.Second) + + err = te.machineWait(te.listener, "test-uuid") require.NoError(t, err) // DB contains at least a machine which is allocatable @@ -194,6 +210,8 @@ func TestMachineAllocationIntegrationFullCycle(t *testing.T) { status = te.machineFree(t, "test-uuid", &v1.MachineResponse{}) require.Equal(t, http.StatusOK, status) + time.Sleep(1 * time.Second) + // Check on the switch that connections still exists, but filters are nil, // this ensures that the freeMachine call executed and reset the machine<->switch configuration items. status = te.switchGet(t, "test-switch01", &foundSwitch) diff --git a/cmd/metal-api/main.go b/cmd/metal-api/main.go index c5ea61907..d9369b79b 100644 --- a/cmd/metal-api/main.go +++ b/cmd/metal-api/main.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "log/slog" + "net" "net/http" httppprof "net/http/pprof" "os" @@ -973,6 +974,13 @@ func run() error { log.Fatalf("cannot connect to NSQ: %s", err) } + addr := fmt.Sprintf(":%d", viper.GetInt("grpc-port")) + listener, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("cannot create grpc server listener on addr %s: %s", addr, err) + return err + } + go func() { err = grpc.Run(&grpc.ServerConfig{ Context: context.Background(), @@ -980,7 +988,7 @@ func run() error { Consumer: c, Store: ds, Logger: logger, - GrpcPort: viper.GetInt("grpc-port"), + Listener: listener, TlsEnabled: viper.GetBool("grpc-tls-enabled"), CaCertFile: viper.GetString("grpc-ca-cert-file"), ServerCertFile: viper.GetString("grpc-server-cert-file"), @@ -994,7 +1002,7 @@ func run() error { } }() - addr := fmt.Sprintf("%s:%d", viper.GetString("bind-addr"), viper.GetInt("port")) + addr = fmt.Sprintf("%s:%d", viper.GetString("bind-addr"), viper.GetInt("port")) server := &http.Server{ Addr: addr, Handler: restful.DefaultContainer, diff --git a/go.mod b/go.mod index a390480b0..43fb7ce34 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,6 @@ require ( github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.32.0 - github.com/undefinedlabs/go-mpatch v1.0.7 golang.org/x/crypto v0.25.0 golang.org/x/sync v0.8.0 google.golang.org/grpc v1.65.0 diff --git a/go.sum b/go.sum index e858503ec..ecba94279 100644 --- a/go.sum +++ b/go.sum @@ -446,8 +446,6 @@ github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZ github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYgY= github.com/tklauser/numcpus v0.8.0/go.mod h1:ZJZlAY+dmR4eut8epnzf0u/VwodKmryxR8txiloSqBE= -github.com/undefinedlabs/go-mpatch v1.0.7 h1:943FMskd9oqfbZV0qRVKOUsXQhTLXL0bQTVbQSpzmBs= -github.com/undefinedlabs/go-mpatch v1.0.7/go.mod h1:TyJZDQ/5AgyN7FSLiBJ8RO9u2c6wbtRvK827b6AVqY4= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.37.1-0.20220607072126-8a320890c08d/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I= diff --git a/pkg/grpc/wait.go b/pkg/grpc/wait.go new file mode 100644 index 000000000..191972bd1 --- /dev/null +++ b/pkg/grpc/wait.go @@ -0,0 +1,57 @@ +package helper + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "time" + + v1 "github.com/metal-stack/metal-api/pkg/api/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// WaitForAllocation can be used to call the wait method continuously until an allocation was made. +// This is made for the metal-hammer and located here for better testability. +func WaitForAllocation(ctx context.Context, log *slog.Logger, service v1.BootServiceClient, machineID string, timeout time.Duration) error { + req := &v1.BootServiceWaitRequest{ + MachineId: machineID, + } + + for { + stream, err := service.Wait(ctx, req) + if err != nil { + log.Error("failed waiting for allocation", "retry after", timeout, "error", err) + + time.Sleep(timeout) + continue + } + + for { + _, err := stream.Recv() + if errors.Is(err, io.EOF) { + log.Info("machine has been requested for allocation", "machineID", machineID) + return nil + } + + if err != nil { + if e, ok := status.FromError(err); ok { + log.Error("got error from wait call", "code", e.Code(), "message", e.Message(), "details", e.Details()) + switch e.Code() { // nolint:exhaustive + case codes.Unimplemented: + return fmt.Errorf("metal-api breaking change detected, rebooting: %w", err) + } + } + + log.Error("failed stream receiving during waiting for allocation", "retry after", timeout, "error", err) + + time.Sleep(timeout) + break + } + + log.Info("wait for allocation...") + } + } +} diff --git a/test/integration.go b/test/integration.go index 9411d019e..ac93a6f39 100644 --- a/test/integration.go +++ b/test/integration.go @@ -2,17 +2,16 @@ package test import ( "context" + "fmt" + "log/slog" "testing" + "github.com/metal-stack/metal-lib/bus" + "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" ) -func init() { - // prevent testcontainer logging mangle test and benchmark output - // log.SetOutput(io.Discard) -} - type ConnectionDetails struct { Port string IP string @@ -147,3 +146,46 @@ func StartMeilisearch(t testing.TB) (container testcontainers.Container, c *Conn return meiliContainer, conn, err } + +func StartNsqd(t *testing.T, log *slog.Logger) (testcontainers.Container, bus.Publisher, *bus.Consumer) { + ctx := context.Background() + + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "nsqio/nsq:v1.3.0", + ExposedPorts: []string{"4150/tcp", "4151/tcp"}, + WaitingFor: wait.ForAll( + wait.ForListeningPort("4150/tcp"), + wait.ForListeningPort("4151/tcp"), + ), + Cmd: []string{"nsqd"}, + }, + Started: true, + Logger: testcontainers.TestLogger(t), + }) + require.NoError(t, err) + + ip, err := c.Host(ctx) + require.NoError(t, err) + + tcpPort, err := c.MappedPort(ctx, "4150") + require.NoError(t, err) + httpPort, err := c.MappedPort(ctx, "4151") + require.NoError(t, err) + + consumer, err := bus.NewConsumer(log, nil) + require.NoError(t, err) + + tcpAddress := fmt.Sprintf("%s:%d", ip, tcpPort.Int()) + httpAddress := fmt.Sprintf("%s:%d", ip, httpPort.Int()) + + consumer.With(bus.NSQDs(tcpAddress)) + + publisher, err := bus.NewPublisher(log, &bus.PublisherConfig{ + TCPAddress: tcpAddress, + HTTPEndpoint: httpAddress, + }) + require.NoError(t, err) + + return c, publisher, consumer +}