Skip to content

Commit

Permalink
delete headscale node entry when machine is freed (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyMikhalkin authored Oct 19, 2022
1 parent b1760b9 commit 60abf6a
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 44 deletions.
42 changes: 31 additions & 11 deletions cmd/metal-api/internal/headscale/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,28 +119,48 @@ func (h *HeadscaleClient) CreatePreAuthKey(namespace string, expiration time.Tim
return resp.PreAuthKey.Key, nil
}

func (h *HeadscaleClient) DescribeMachine(machineid, projectID string) (connected bool, err error) {
func (h *HeadscaleClient) DescribeMachine(machineID, projectID string) (connected bool, err error) {
machine, err := h.getMachine(machineID, projectID)
if err != nil || machine == nil {
return false, err
}

return machine.LastSeen.AsTime().After(time.Now().Add(-5 * time.Minute)), nil
}

// DeleteMachine removes the node entry from headscale DB
func (h *HeadscaleClient) DeleteMachine(machineID, projectID string) (err error) {
machine, err := h.getMachine(machineID, projectID)
if err != nil || machine == nil {
return err
}

req := &headscalev1.DeleteMachineRequest{
MachineId: machine.Id,
}
if _, err := h.client.DeleteMachine(h.ctx, req); err != nil {
return fmt.Errorf("failed to delete machine: %w", err)
}

return nil
}

func (h *HeadscaleClient) getMachine(machineID, projectID string) (machine *headscalev1.Machine, err error) {
req := &headscalev1.ListMachinesRequest{
Namespace: projectID,
}
resp, err := h.client.ListMachines(h.ctx, req)
if err != nil || resp == nil {
return false, fmt.Errorf("failed to list machines: %w", err)
return nil, fmt.Errorf("failed to list machines: %w", err)
}

for _, m := range resp.Machines {
if m.Name == machineid {
if m.LastSeen.AsTime().After(
time.Now().Add(-5 * time.Minute),
) {
connected = true
}

return
if m.Name == machineID {
return m, nil
}
}

return false, nil
return nil, nil
}

// Close client
Expand Down
14 changes: 12 additions & 2 deletions cmd/metal-api/internal/service/async-actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"errors"
"fmt"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/headscale"

"go.uber.org/zap"

ipamer "github.com/metal-stack/go-ipam"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/datastore"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/ipam"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
"github.com/metal-stack/metal-lib/bus"
"github.com/metal-stack/metal-lib/pkg/tag"
"go.uber.org/zap"
)

type asyncActor struct {
Expand Down Expand Up @@ -39,11 +42,18 @@ func newAsyncActor(l *zap.SugaredLogger, ep *bus.Endpoints, ds *datastore.Rethin
return actor, nil
}

func (a *asyncActor) freeMachine(pub bus.Publisher, m *metal.Machine) error {
func (a *asyncActor) freeMachine(pub bus.Publisher, m *metal.Machine, headscaleClient *headscale.HeadscaleClient, logger *zap.SugaredLogger) error {
if m.State.Value == metal.LockedState {
return errors.New("machine is locked")
}

if headscaleClient != nil {
// always call DeleteMachine, in case machine is not registered it will return nil
if err := headscaleClient.DeleteMachine(m.ID, m.Allocation.Project); err != nil {
logger.Error("unable to delete Node entry from headscale DB", zap.String("machineID", m.ID), zap.Error(err))
}
}

err := deleteVRFSwitches(a.RethinkStore, m, a.log.Desugar())
if err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions cmd/metal-api/internal/service/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"

"github.com/testcontainers/testcontainers-go"
"go.uber.org/zap/zaptest"

metalgrpc "github.com/metal-stack/metal-api/cmd/metal-api/internal/grpc"
"github.com/metal-stack/metal-api/test"
"github.com/metal-stack/metal-lib/bus"
"github.com/metal-stack/security"
"github.com/testcontainers/testcontainers-go"
"go.uber.org/zap/zaptest"

mdmv1 "github.com/metal-stack/masterdata-api/api/v1"
mdmv1mock "github.com/metal-stack/masterdata-api/api/v1/mocks"
mdm "github.com/metal-stack/masterdata-api/pkg/client"

restful "github.com/emicklei/go-restful/v3"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/datastore"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/ipam"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/metal"
Expand Down Expand Up @@ -98,7 +100,7 @@ func createTestEnvironment(t *testing.T) testEnv {

hma := security.NewHMACAuth(testUserDirectory.admin.Name, []byte{1, 2, 3}, security.WithUser(testUserDirectory.admin))
usergetter := security.NewCreds(security.WithHMAC(hma))
machineService, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipamer, mdc, nil, usergetter, 0)
machineService, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipamer, mdc, nil, usergetter, 0, nil)
require.NoError(t, err)
imageService := NewImage(log, ds)
switchService := NewSwitch(log, ds)
Expand Down
18 changes: 13 additions & 5 deletions cmd/metal-api/internal/service/machine-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import (
"strings"
"time"

"github.com/metal-stack/metal-api/cmd/metal-api/internal/headscale"

"github.com/avast/retry-go/v4"
"github.com/aws/aws-sdk-go/service/s3"

s3server "github.com/metal-stack/metal-api/cmd/metal-api/internal/service/s3client"
"github.com/metal-stack/security"

"golang.org/x/crypto/ssh"

"go.uber.org/zap"

"github.com/metal-stack/metal-lib/httperrors"
"github.com/metal-stack/metal-lib/pkg/tag"
"go.uber.org/zap"

mdmv1 "github.com/metal-stack/masterdata-api/api/v1"
mdm "github.com/metal-stack/masterdata-api/pkg/client"
Expand All @@ -32,6 +36,7 @@ import (

restfulspec "github.com/emicklei/go-restful-openapi/v2"
"github.com/emicklei/go-restful/v3"

"github.com/metal-stack/metal-lib/bus"
)

Expand All @@ -44,6 +49,7 @@ type machineResource struct {
s3Client *s3server.Client
userGetter security.UserGetter
reasonMinLength uint
headscaleClient *headscale.HeadscaleClient
}

// machineAllocationSpec is a specification for a machine allocation
Expand Down Expand Up @@ -105,6 +111,7 @@ func NewMachine(
s3Client *s3server.Client,
userGetter security.UserGetter,
reasonMinLength uint,
headscaleClient *headscale.HeadscaleClient,
) (*restful.WebService, error) {
r := machineResource{
webResource: webResource{
Expand All @@ -117,6 +124,7 @@ func NewMachine(
s3Client: s3Client,
userGetter: userGetter,
reasonMinLength: reasonMinLength,
headscaleClient: headscaleClient,
}
var err error
r.actor, err = newAsyncActor(log, ep, ds, ipamer)
Expand Down Expand Up @@ -1502,7 +1510,7 @@ func (r machineResource) freeMachine(request *restful.Request, response *restful
logger.Error("unable to publish machine command", zap.String("command", string(metal.ChassisIdentifyLEDOffCmd)), zap.String("machineID", m.ID), zap.Error(err))
}

err = r.actor.freeMachine(r.Publisher, m)
err = r.actor.freeMachine(r.Publisher, m, r.headscaleClient, logger)
if err != nil {
r.sendError(request, response, defaultError(err))
return
Expand Down Expand Up @@ -1782,7 +1790,7 @@ func evaluateMachineLiveliness(ds *datastore.RethinkStore, m metal.Machine) (met
}

// ResurrectMachines attempts to resurrect machines that are obviously dead
func ResurrectMachines(ds *datastore.RethinkStore, publisher bus.Publisher, ep *bus.Endpoints, ipamer ipam.IPAMer, logger *zap.SugaredLogger) error {
func ResurrectMachines(ds *datastore.RethinkStore, publisher bus.Publisher, ep *bus.Endpoints, ipamer ipam.IPAMer, headscaleClient *headscale.HeadscaleClient, logger *zap.SugaredLogger) error {
logger.Info("machine resurrection was requested")

machines, err := ds.ListMachines()
Expand Down Expand Up @@ -1814,7 +1822,7 @@ func ResurrectMachines(ds *datastore.RethinkStore, publisher bus.Publisher, ep *

if provisioningEvents.Liveliness == metal.MachineLivelinessDead && time.Since(*provisioningEvents.LastEventTime) > metal.MachineResurrectAfter {
logger.Infow("resurrecting dead machine", "machineID", m.ID, "liveliness", provisioningEvents.Liveliness, "since", time.Since(*provisioningEvents.LastEventTime).String())
err = act.freeMachine(publisher, &m)
err = act.freeMachine(publisher, &m, headscaleClient, logger)
if err != nil {
logger.Errorw("error during machine resurrection", "machineID", m.ID, "error", err)
}
Expand All @@ -1823,7 +1831,7 @@ func ResurrectMachines(ds *datastore.RethinkStore, publisher bus.Publisher, ep *

if provisioningEvents.FailedMachineReclaim {
logger.Infow("resurrecting machine with failed reclaim", "machineID", m.ID, "liveliness", provisioningEvents.Liveliness, "since", time.Since(*provisioningEvents.LastEventTime).String())
err = act.freeMachine(publisher, &m)
err = act.freeMachine(publisher, &m, headscaleClient, logger)
if err != nil {
logger.Errorw("error during machine resurrection", "machineID", m.ID, "error", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ import (
"testing"
"time"

grpcv1 "github.com/metal-stack/metal-api/pkg/api/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

grpcv1 "github.com/metal-stack/metal-api/pkg/api/v1"

"github.com/avast/retry-go/v4"
"github.com/emicklei/go-restful/v3"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"

goipam "github.com/metal-stack/go-ipam"
mdmv1 "github.com/metal-stack/masterdata-api/api/v1"
mdmv1mock "github.com/metal-stack/masterdata-api/api/v1/mocks"
Expand All @@ -33,9 +38,6 @@ import (
"github.com/metal-stack/metal-lib/bus"
"github.com/metal-stack/metal-lib/rest"
"github.com/metal-stack/security"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
)

var (
Expand Down Expand Up @@ -328,7 +330,7 @@ func setupTestEnvironment(machineCount int, t *testing.T) (*datastore.RethinkSto
}()

usergetter := security.NewCreds(security.WithHMAC(hma))
ms, err := NewMachine(log, rs, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(ipamer), mdc, nil, usergetter, 0)
ms, err := NewMachine(log, rs, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(ipamer), mdc, nil, usergetter, 0, nil)
require.NoError(t, err)
container := restful.NewContainer().Add(ms)
container.Filter(rest.UserAuth(usergetter, zaptest.NewLogger(t).Sugar()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func BenchmarkMachineList(b *testing.B) {
require.NoError(b, err)
}

machineService, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), nil, nil, nil, nil, 0)
machineService, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), nil, nil, nil, nil, 0, nil)
require.NoError(b, err)

b.ResetTimer()
Expand Down
31 changes: 16 additions & 15 deletions cmd/metal-api/internal/service/machine-service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"testing"

"github.com/emicklei/go-restful/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/crypto/ssh"
r "gopkg.in/rethinkdb/rethinkdb-go.v6"

goipam "github.com/metal-stack/go-ipam"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/datastore"
"github.com/metal-stack/metal-api/cmd/metal-api/internal/ipam"
Expand All @@ -17,11 +23,6 @@ import (
"github.com/metal-stack/metal-api/cmd/metal-api/internal/testdata"
"github.com/metal-stack/metal-lib/bus"
"github.com/metal-stack/security"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/crypto/ssh"
r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

const (
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestGetMachines(t *testing.T) {
testdata.InitMockDBData(mock)
log := zaptest.NewLogger(t).Sugar()

machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0)
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0, nil)
require.NoError(t, err)
container := restful.NewContainer().Add(machineservice)
req := httptest.NewRequest("GET", "/v1/machine", nil)
Expand Down Expand Up @@ -121,7 +122,7 @@ func TestMachineIPMIReport(t *testing.T) {
for i := range tests {
tt := tests[i]
t.Run(tt.name, func(t *testing.T) {
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0)
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0, nil)
require.NoError(t, err)
container := restful.NewContainer().Add(machineservice)
js, err := json.Marshal(tt.input)
Expand Down Expand Up @@ -172,7 +173,7 @@ func TestMachineFindIPMI(t *testing.T) {
mock.On(r.DB("mockdb").Table("machine").Filter(r.MockAnything())).Return([]interface{}{*tt.machine}, nil)
testdata.InitMockDBData(mock)

machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0)
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0, nil)
require.NoError(t, err)
container := restful.NewContainer().Add(machineservice)

Expand Down Expand Up @@ -228,7 +229,7 @@ func TestSetMachineState(t *testing.T) {
Name: "anonymous",
}}

machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, userGetter, 0)
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, userGetter, 0, nil)
require.NoError(t, err)

container := restful.NewContainer().Add(machineservice)
Expand Down Expand Up @@ -269,7 +270,7 @@ func TestSetMachineStateIssuerResetWhenAvailable(t *testing.T) {
Name: "anonymous",
}}

machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, userGetter, 0)
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, userGetter, 0, nil)
require.NoError(t, err)

container := restful.NewContainer().Add(machineservice)
Expand Down Expand Up @@ -305,7 +306,7 @@ func TestGetMachine(t *testing.T) {
testdata.InitMockDBData(mock)
log := zaptest.NewLogger(t).Sugar()

machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0)
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0, nil)
require.NoError(t, err)

container := restful.NewContainer().Add(machineservice)
Expand Down Expand Up @@ -333,7 +334,7 @@ func TestGetMachineNotFound(t *testing.T) {
testdata.InitMockDBData(mock)
log := zaptest.NewLogger(t).Sugar()

machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0)
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0, nil)
require.NoError(t, err)

container := restful.NewContainer().Add(machineservice)
Expand Down Expand Up @@ -367,7 +368,7 @@ func TestFreeMachine(t *testing.T) {
return nil
}

machineservice, err := NewMachine(log, ds, pub, bus.NewEndpoints(nil, pub), ipam.New(goipam.New()), nil, nil, nil, 0)
machineservice, err := NewMachine(log, ds, pub, bus.NewEndpoints(nil, pub), ipam.New(goipam.New()), nil, nil, nil, 0, nil)
require.NoError(t, err)

container := restful.NewContainer().Add(machineservice)
Expand All @@ -394,7 +395,7 @@ func TestSearchMachine(t *testing.T) {
testdata.InitMockDBData(mock)
log := zaptest.NewLogger(t).Sugar()

machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0)
machineservice, err := NewMachine(log, ds, &emptyPublisher{}, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0, nil)
require.NoError(t, err)

container := restful.NewContainer().Add(machineservice)
Expand Down Expand Up @@ -486,7 +487,7 @@ func TestOnMachine(t *testing.T) {
return nil
}

machineservice, err := NewMachine(log, ds, pub, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0)
machineservice, err := NewMachine(log, ds, pub, bus.DirectEndpoints(), ipam.New(goipam.New()), nil, nil, nil, 0, nil)
require.NoError(t, err)

js, err := json.Marshal([]string{tt.param})
Expand Down
4 changes: 2 additions & 2 deletions cmd/metal-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func initRestServices(withauth bool) *restfulspec.Config {
}
reasonMinLength := viper.GetUint("password-reason-minlength")

machineService, err := service.NewMachine(logger.Named("machine-service"), ds, p, ep, ipamer, mdc, s3Client, userGetter, reasonMinLength)
machineService, err := service.NewMachine(logger.Named("machine-service"), ds, p, ep, ipamer, mdc, s3Client, userGetter, reasonMinLength, headscaleClient)
if err != nil {
logger.Fatal(err)
}
Expand Down Expand Up @@ -820,7 +820,7 @@ func resurrectDeadMachines() error {
p = nsqer.Publisher
ep = nsqer.Endpoints
}
err = service.ResurrectMachines(ds, p, ep, ipamer, logger)
err = service.ResurrectMachines(ds, p, ep, ipamer, headscaleClient, logger)
if err != nil {
return fmt.Errorf("unable to resurrect machines: %w", err)
}
Expand Down

0 comments on commit 60abf6a

Please sign in to comment.