Skip to content

Commit 66069fb

Browse files
authored
NET-1082: Scale Testing Fixes (#2894)
* add additional mutex lock on node acls func * increase verbosity * disable acls on cloud emqx * add emqx creds creation to go routine * add debug log of mq client id * comment port check * uncomment port check * check for connection mq connection open * use username for client id * add write mutex on acl is allowed * add mq connection lost handler on server * spin off zombie init as go routine * get whole api path from config * Revert "get whole api path from config" This reverts commit 392f5f4. * update extclient acls async * add additional mutex lock on node acls func (cherry picked from commit 5325f0e) * increase verbosity (cherry picked from commit 705b3cf) * add emqx creds creation to go routine (cherry picked from commit c8e65f4) * add debug log of mq client id (cherry picked from commit 29c5d6c) * comment port check (cherry picked from commit db8d6d9) * check for connection mq connection open (cherry picked from commit 13b1103) * use username for client id (cherry picked from commit e90c738) * add write mutex on acl is allowed (cherry picked from commit 4cae1b0) * add mq connection lost handler on server (cherry picked from commit c82918a) * spin off zombie init as go routine (cherry picked from commit 6d65c44) * update extclient acls async (cherry picked from commit 6557ef1) * additionl logs for oauth user flow (cherry picked from commit 6170303) * add more debug logs (cherry picked from commit 5980bea) * add more debug logs (cherry picked from commit 4d001f0) * add set auth secret (cherry picked from commit f41cef5) * fix fetch pass (cherry picked from commit 825caf4) * make sure auth secret is set only once (cherry picked from commit ba33ed0) * make sure auth secret is set only once (cherry picked from commit 920ac4c) * comment usage of emqx acls * replace read lock with write lock on acls * replace read lock with write lock on acls (cherry picked from commit 808d213) * use deadlock pkg for visibility * add additional mutex locks * remove race flag * on mq re-connecting donot exit if failed * on mq re-connecting donot exit if failed * revert mutex package change * set mq clean session * remove debug log * go mod tidy * revert on prem emqx acls del
1 parent 0b2422b commit 66069fb

File tree

13 files changed

+71
-179
lines changed

13 files changed

+71
-179
lines changed

controllers/enrollmentkeys.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
308308
if !hostExists {
309309
newHost.PersistentKeepalive = models.DefaultPersistentKeepAlive
310310
// register host
311-
logic.CheckHostPorts(&newHost)
311+
//logic.CheckHostPorts(&newHost)
312312
// create EMQX credentials and ACLs for host
313313
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
314314
if err := mq.GetEmqxHandler().CreateEmqxUser(newHost.ID.String(), newHost.HostPass); err != nil {

controllers/ext_client.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -436,15 +436,14 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
436436
return
437437
}
438438

439-
if err := logic.SetClientDefaultACLs(&extclient); err != nil {
440-
slog.Error("failed to set default acls for extclient", "user", r.Header.Get("user"), "network", node.Network, "error", err)
441-
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
442-
return
443-
}
444-
445439
slog.Info("created extclient", "user", r.Header.Get("user"), "network", node.Network, "clientid", extclient.ClientID)
446440
w.WriteHeader(http.StatusOK)
447441
go func() {
442+
if err := logic.SetClientDefaultACLs(&extclient); err != nil {
443+
slog.Error("failed to set default acls for extclient", "user", r.Header.Get("user"), "network", node.Network, "error", err)
444+
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
445+
return
446+
}
448447
if err := mq.PublishPeerUpdate(false); err != nil {
449448
logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
450449
}

controllers/hosts.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -554,26 +554,27 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
554554
logic.ReturnErrorResponse(response, request, errorResponse)
555555
return
556556
}
557-
558-
// Create EMQX creds and ACLs if not found
559-
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
560-
if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
561-
slog.Error("failed to create host credentials for EMQX: ", err.Error())
562-
} else {
563-
if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
564-
slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
565-
}
566-
for _, nodeID := range host.Nodes {
567-
if node, err := logic.GetNodeByID(nodeID); err == nil {
568-
if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
569-
slog.Error("failed to add ACLs for EMQX node", "error", err)
557+
go func() {
558+
// Create EMQX creds and ACLs if not found
559+
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
560+
if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
561+
slog.Error("failed to create host credentials for EMQX: ", err.Error())
562+
} else {
563+
if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
564+
slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
565+
}
566+
for _, nodeID := range host.Nodes {
567+
if node, err := logic.GetNodeByID(nodeID); err == nil {
568+
if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
569+
slog.Error("failed to add ACLs for EMQX node", "error", err)
570+
}
571+
} else {
572+
slog.Error("failed to get node", "nodeid", nodeID, "error", err)
570573
}
571-
} else {
572-
slog.Error("failed to get node", "nodeid", nodeID, "error", err)
573574
}
574575
}
575576
}
576-
}
577+
}()
577578

578579
response.WriteHeader(http.StatusOK)
579580
response.Header().Set("Content-Type", "application/json")

logic/acls/common.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ func (acl ACL) Save(containerID ContainerID, ID AclID) (ACL, error) {
6464

6565
// ACL.IsAllowed - sees if ID is allowed in referring ACL
6666
func (acl ACL) IsAllowed(ID AclID) (allowed bool) {
67-
AclMutex.RLock()
67+
AclMutex.Lock()
6868
allowed = acl[ID] == Allowed
69-
AclMutex.RUnlock()
69+
AclMutex.Unlock()
7070
return
7171
}
7272

@@ -88,6 +88,8 @@ func (aclContainer ACLContainer) RemoveACL(ID AclID) ACLContainer {
8888

8989
// ACLContainer.ChangeAccess - changes the relationship between two nodes in memory
9090
func (networkACL ACLContainer) ChangeAccess(ID1, ID2 AclID, value byte) {
91+
AclMutex.Lock()
92+
defer AclMutex.Unlock()
9193
if _, ok := networkACL[ID1]; !ok {
9294
slog.Error("ACL missing for ", "id", ID1)
9395
return

logic/acls/nodeacls/retrieve.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,26 @@ package nodeacls
33
import (
44
"encoding/json"
55
"fmt"
6+
"sync"
67

78
"github.com/gravitl/netmaker/logic/acls"
89
)
910

11+
var NodesAllowedACLMutex = &sync.Mutex{}
12+
1013
// AreNodesAllowed - checks if nodes are allowed to communicate in their network ACL
1114
func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
15+
NodesAllowedACLMutex.Lock()
16+
defer NodesAllowedACLMutex.Unlock()
1217
var currentNetworkACL, err = FetchAllACLs(networkID)
1318
if err != nil {
1419
return false
1520
}
1621
var allowed bool
17-
acls.AclMutex.RLock()
22+
acls.AclMutex.Lock()
1823
currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
1924
currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
20-
acls.AclMutex.RUnlock()
25+
acls.AclMutex.Unlock()
2126
allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
2227
return allowed
2328
}

logic/errors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"encoding/json"
55
"net/http"
66

7-
"github.com/gravitl/netmaker/logger"
87
"github.com/gravitl/netmaker/models"
8+
"golang.org/x/exp/slog"
99
)
1010

1111
// FormatError - takes ErrorResponse and uses correct code
@@ -62,7 +62,7 @@ func ReturnErrorResponse(response http.ResponseWriter, request *http.Request, er
6262
if err != nil {
6363
panic(err)
6464
}
65-
logger.Log(1, "processed request error:", errorMessage.Message)
65+
slog.Debug("processed request error", "err", errorMessage.Message)
6666
response.Header().Set("Content-Type", "application/json")
6767
response.WriteHeader(errorMessage.Code)
6868
response.Write(jsonResponse)

logic/zombie.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func checkForZombieHosts(h *models.Host) {
7676
// ManageZombies - goroutine which adds/removes/deletes nodes from the zombie node quarantine list
7777
func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) {
7878
logger.Log(2, "Zombie management started")
79-
InitializeZombies()
79+
go InitializeZombies()
8080

8181
// Zombie Nodes Cleanup Four Times a Day
8282
ticker := time.NewTicker(time.Hour * ZOMBIE_TIMEOUT)

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) {
155155
defer wg.Done()
156156
brokerHost, _ := servercfg.GetMessageQueueEndpoint()
157157
logger.Log(0, "connecting to mq broker at", brokerHost)
158-
mq.SetupMQTT()
158+
mq.SetupMQTT(true)
159159
if mq.IsConnected() {
160160
logger.Log(0, "connected to MQ Broker")
161161
} else {

migrate/migrate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func updateAcls() {
287287
}
288288

289289
// save new acls
290-
slog.Info(fmt.Sprintf("(migration) saving new acls for network: %s", network.NetID), "networkAcl", networkAcl)
290+
slog.Debug(fmt.Sprintf("(migration) saving new acls for network: %s", network.NetID), "networkAcl", networkAcl)
291291
if _, err := networkAcl.Save(acls.ContainerID(network.NetID)); err != nil {
292292
slog.Error(fmt.Sprintf("error during acls migration. error saving new acls for network: %s", network.NetID), "error", err)
293293
continue

mq/emqx_cloud.go

Lines changed: 3 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,6 @@ type userCreateReq struct {
2222
Password string `json:"password"`
2323
}
2424

25-
type cloudAcl struct {
26-
UserName string `json:"username"`
27-
Topic string `json:"topic"`
28-
Action string `json:"action"`
29-
Access string `json:"access"`
30-
}
31-
3225
func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
3326

3427
func (e *EmqxCloud) CreateEmqxUser(username, pass string) error {
@@ -89,54 +82,7 @@ func (e *EmqxCloud) CreateEmqxUserforServer() error {
8982
if res.StatusCode != http.StatusOK {
9083
return errors.New("request failed " + string(body))
9184
}
92-
// add acls
93-
acls := []cloudAcl{
94-
{
95-
UserName: servercfg.GetMqUserName(),
96-
Topic: fmt.Sprintf("update/%s/#", servercfg.GetServer()),
97-
Access: "allow",
98-
Action: "sub",
99-
},
100-
{
101-
UserName: servercfg.GetMqUserName(),
102-
Topic: fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()),
103-
Access: "allow",
104-
Action: "sub",
105-
},
106-
{
107-
UserName: servercfg.GetMqUserName(),
108-
Topic: fmt.Sprintf("signal/%s/#", servercfg.GetServer()),
109-
Access: "allow",
110-
Action: "sub",
111-
},
112-
{
113-
UserName: servercfg.GetMqUserName(),
114-
Topic: fmt.Sprintf("metrics/%s/#", servercfg.GetServer()),
115-
Access: "allow",
116-
Action: "sub",
117-
},
118-
{
119-
UserName: servercfg.GetMqUserName(),
120-
Topic: "peers/host/#",
121-
Access: "allow",
122-
Action: "pub",
123-
},
124-
{
125-
UserName: servercfg.GetMqUserName(),
126-
Topic: "node/update/#",
127-
Access: "allow",
128-
Action: "pub",
129-
},
130-
{
131-
132-
UserName: servercfg.GetMqUserName(),
133-
Topic: "host/update/#",
134-
Access: "allow",
135-
Action: "pub",
136-
},
137-
}
138-
139-
return e.createacls(acls)
85+
return nil
14086
}
14187

14288
func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
@@ -147,94 +93,13 @@ func (e *EmqxCloud) CreateDefaultDenyRule() error {
14793
return nil
14894
}
14995

150-
func (e *EmqxCloud) createacls(acls []cloudAcl) error {
151-
payload, err := json.Marshal(acls)
152-
if err != nil {
153-
return err
154-
}
155-
client := &http.Client{}
156-
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload)))
157-
if err != nil {
158-
return err
159-
}
160-
req.Header.Add("Content-Type", "application/json")
161-
req.SetBasicAuth(e.AppID, e.AppSecret)
162-
res, err := client.Do(req)
163-
if err != nil {
164-
return err
165-
}
166-
defer res.Body.Close()
167-
168-
body, err := io.ReadAll(res.Body)
169-
if err != nil {
170-
return err
171-
}
172-
if res.StatusCode != http.StatusOK {
173-
return errors.New("request failed " + string(body))
174-
}
175-
return nil
176-
}
177-
17896
func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
179-
acls := []cloudAcl{
180-
{
181-
UserName: hostID,
182-
Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
183-
Access: "allow",
184-
Action: "sub",
185-
},
186-
{
187-
UserName: hostID,
188-
Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
189-
Access: "allow",
190-
Action: "sub",
191-
},
192-
{
193-
UserName: hostID,
194-
Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
195-
Access: "allow",
196-
Action: "pub",
197-
},
198-
}
199-
200-
return e.createacls(acls)
97+
return nil
20198
}
20299

203100
func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
204-
acls := []cloudAcl{
205-
{
206-
UserName: hostID,
207-
Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
208-
Access: "allow",
209-
Action: "sub",
210-
},
211-
{
212-
UserName: hostID,
213-
Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
214-
Access: "allow",
215-
Action: "pubsub",
216-
},
217-
{
218-
UserName: hostID,
219-
Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
220-
Access: "allow",
221-
Action: "pubsub",
222-
},
223-
{
224-
UserName: hostID,
225-
Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
226-
Access: "allow",
227-
Action: "pubsub",
228-
},
229-
{
230-
UserName: hostID,
231-
Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
232-
Access: "allow",
233-
Action: "pubsub",
234-
},
235-
}
101+
return nil
236102

237-
return e.createacls(acls)
238103
}
239104

240105
func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list

0 commit comments

Comments
 (0)