Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(federated): allow to pickup a specific worker, improve loadbalancing #3243

Merged
merged 4 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/cli/federated.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ type FederatedCLI struct {
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
LoadBalanced bool `env:"LOCALAI_LOAD_BALANCED,LOAD_BALANCED" default:"false" help:"Enable load balancing" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
}

func (f *FederatedCLI) Run(ctx *cliContext.Context) error {

fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced)
fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, f.LoadBalanced, f.TargetWorker)

return fs.Start(context.Background())
}
39 changes: 36 additions & 3 deletions core/p2p/federated.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package p2p

import "fmt"
import (
"fmt"
"math/rand/v2"

"github.com/rs/zerolog/log"
)

const FederatedID = "federated"

Expand All @@ -15,19 +20,46 @@
listenAddr, service, p2ptoken string
requestTable map[string]int
loadBalanced bool
workerTarget string
}

func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool) *FederatedServer {
func NewFederatedServer(listenAddr, service, p2pToken string, loadBalanced bool, workerTarget string) *FederatedServer {
return &FederatedServer{
listenAddr: listenAddr,
service: service,
p2ptoken: p2pToken,
requestTable: map[string]int{},
loadBalanced: loadBalanced,
workerTarget: workerTarget,
}
}

func (fs *FederatedServer) RandomServer() string {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}

if len(tunnelAddresses) == 0 {
return ""
}

return tunnelAddresses[rand.IntN(len(tunnelAddresses))]
Fixed Show fixed Hide fixed
Dismissed Show dismissed Hide dismissed
}

func (fs *FederatedServer) SelectLeastUsedServer() string {
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
fs.ensureRecordExist(v.TunnelAddress)
} else {
delete(fs.requestTable, v.TunnelAddress)
}
}

// cycle over requestTable and find the entry with the lower number
// if there are multiple entries with the same number, select one randomly
// if there are no entries, return an empty string
Expand All @@ -39,6 +71,7 @@
minKey = k
}
}

return minKey
}

Expand All @@ -47,7 +80,7 @@
fs.requestTable[nodeID]++
}

func (fs *FederatedServer) EnsureRecordExist(nodeID string) {
func (fs *FederatedServer) ensureRecordExist(nodeID string) {
// if the nodeID is not in the requestTable, add it with a counter of 0
_, ok := fs.requestTable[nodeID]
if !ok {
Expand Down
40 changes: 16 additions & 24 deletions core/p2p/federated_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"net"
"time"

"math/rand/v2"

"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
Expand Down Expand Up @@ -76,7 +74,7 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Debug().Msg("New for connection")
log.Debug().Msg("New connection")
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
Expand All @@ -86,36 +84,30 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {

// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}

if len(tunnelAddresses) == 0 {
log.Error().Msg("No available nodes yet")
return
}

tunnelAddr := ""

if fs.loadBalanced {
for _, t := range tunnelAddresses {
fs.EnsureRecordExist(t)
if fs.workerTarget != "" {
for _, v := range GetAvailableNodes(fs.service) {
if v.ID == fs.workerTarget {
tunnelAddr = v.TunnelAddress
break
}
}

} else if fs.loadBalanced {
tunnelAddr = fs.SelectLeastUsedServer()
log.Debug().Msgf("Selected tunnel %s", tunnelAddr)
if tunnelAddr == "" {
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
if tunnelAddr != "" {
tunnelAddr = fs.RandomServer()
}

fs.RecordRequest(tunnelAddr)
} else {
tunnelAddr = tunnelAddresses[rand.IntN(len(tunnelAddresses))]
tunnelAddr = fs.RandomServer()
}

if tunnelAddr == "" {
log.Error().Msg("No available nodes yet")
return
}

tunnelConn, err := net.Dial("tcp", tunnelAddr)
Expand Down
4 changes: 3 additions & 1 deletion core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}

// get new services, allocate and return to the channel

// TODO:
Expand All @@ -201,6 +200,9 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
zlog.Debug().Msg("Searching for workers")

data := ledger.LastBlock().Storage[servicesID]

zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data")

for k, v := range data {
zlog.Info().Msgf("Found worker %s", k)
nd := &NodeData{}
Expand Down
Loading