Skip to content

Commit 698a4be

Browse files
committed
configure mq on startup
1 parent f509dff commit 698a4be

File tree

3 files changed

+115
-52
lines changed

3 files changed

+115
-52
lines changed

main.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ func startControllers() {
139139
logger.Log(0, "error occurred initializing DNS: ", err.Error())
140140
}
141141
}
142+
if servercfg.IsMessageQueueBackend() {
143+
if err := mq.Configure(); err != nil {
144+
logger.FatalLog("failed to configure MQ: ", err.Error())
145+
}
146+
}
147+
142148
//Run Rest Server
143149
if servercfg.IsRestBackend() {
144150
if !servercfg.DisableRemoteIPCheck() && servercfg.GetAPIHost() == "127.0.0.1" {
@@ -150,7 +156,6 @@ func startControllers() {
150156
waitnetwork.Add(1)
151157
go controller.HandleRESTRequests(&waitnetwork)
152158
}
153-
154159
//Run MessageQueue
155160
if servercfg.IsMessageQueueBackend() {
156161
waitnetwork.Add(1)
@@ -169,8 +174,7 @@ func runMessageQueue(wg *sync.WaitGroup) {
169174
defer wg.Done()
170175
brokerHost, secure := servercfg.GetMessageQueueEndpoint()
171176
logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
172-
// update admin password and re-create client
173-
mq.Configure()
177+
mq.SetUpAdminClient()
174178
mq.SetupMQTT()
175179
ctx, cancel := context.WithCancel(context.Background())
176180
go mq.DynamicSecManager(ctx)

mq/dynsec.go

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,33 @@ package mq
22

33
import (
44
"context"
5+
"crypto/sha512"
6+
"encoding/base64"
57
"encoding/json"
8+
"errors"
69
"fmt"
10+
"os"
711

812
mqtt "github.com/eclipse/paho.mqtt.golang"
913
"github.com/gravitl/netmaker/logger"
14+
"github.com/gravitl/netmaker/logic"
15+
"github.com/gravitl/netmaker/servercfg"
16+
"golang.org/x/crypto/pbkdf2"
1017
)
1118

1219
const DynamicSecSubTopic = "$CONTROL/dynamic-security/#"
1320
const DynamicSecPubTopic = "$CONTROL/dynamic-security/v1"
1421

1522
type DynSecActionType string
1623

24+
var mqAdminClient mqtt.Client
25+
1726
var (
18-
CreateClient DynSecActionType = "CREATE_CLIENT"
19-
DisableClient DynSecActionType = "DISABLE_CLIENT"
20-
EnableClient DynSecActionType = "ENABLE_CLIENT"
21-
DeleteClient DynSecActionType = "DELETE_CLIENT"
22-
CreateAdminClient DynSecActionType = "CREATE_ADMIN_CLIENT"
23-
ModifyClient DynSecActionType = "MODIFY_CLIENT"
24-
DISABLE_EXISTING_ADMINS DynSecActionType = "DISABLE_EXISTING_ADMINS"
27+
CreateClient DynSecActionType = "CREATE_CLIENT"
28+
DisableClient DynSecActionType = "DISABLE_CLIENT"
29+
EnableClient DynSecActionType = "ENABLE_CLIENT"
30+
DeleteClient DynSecActionType = "DELETE_CLIENT"
31+
ModifyClient DynSecActionType = "MODIFY_CLIENT"
2532
)
2633

2734
var (
@@ -32,10 +39,43 @@ var (
3239
)
3340

3441
var (
35-
mqDynSecAdmin string = "Netmaker-Admin"
36-
adminPassword string = "Netmaker-Admin"
42+
mqAdminUserName string = "Netmaker-Admin"
43+
mqNetmakerServerUserName string = "Netmaker-Server"
3744
)
3845

46+
type client struct {
47+
Username string `json:"username"`
48+
TextName string `json:"textName"`
49+
Password string `json:"password"`
50+
Salt string `json:"salt"`
51+
Iterations int `json:"iterations"`
52+
Roles []struct {
53+
Rolename string `json:"rolename"`
54+
} `json:"roles"`
55+
}
56+
57+
type role struct {
58+
Rolename string `json:"rolename"`
59+
Acls []struct {
60+
Acltype string `json:"acltype"`
61+
Topic string `json:"topic"`
62+
Allow bool `json:"allow"`
63+
} `json:"acls"`
64+
}
65+
66+
type defaultAccessAcl struct {
67+
PublishClientSend bool `json:"publishClientSend"`
68+
PublishClientReceive bool `json:"publishClientReceive"`
69+
Subscribe bool `json:"subscribe"`
70+
Unsubscribe bool `json:"unsubscribe"`
71+
}
72+
73+
type dynCnf struct {
74+
Clients []client `json:"clients"`
75+
Roles []role `json:"roles"`
76+
DefaultACLAccess defaultAccessAcl `json:"defaultACLAccess"`
77+
}
78+
3979
type MqDynSecGroup struct {
4080
Groupname string `json:"groupname"`
4181
Priority int `json:"priority"`
@@ -77,6 +117,39 @@ type MqDynsecPayload struct {
77117

78118
var DynSecChan = make(chan DynSecAction, 100)
79119

120+
func encodePasswordToPBKDF2(password string, salt string, iterations int, keyLength int) string {
121+
binaryEncoded := pbkdf2.Key([]byte(password), []byte(salt), iterations, keyLength, sha512.New)
122+
return base64.StdEncoding.EncodeToString(binaryEncoded)
123+
}
124+
125+
func Configure() error {
126+
file := "/root/dynamic-security.json"
127+
b, err := os.ReadFile(file)
128+
if err != nil {
129+
return err
130+
}
131+
c := dynCnf{}
132+
json.Unmarshal(b, &c)
133+
password := servercfg.GetMqAdminPassword()
134+
if password == "" {
135+
return errors.New("MQ admin password not provided")
136+
}
137+
for i, cI := range c.Clients {
138+
if cI.Username == mqAdminUserName || cI.Username == mqNetmakerServerUserName {
139+
salt := logic.RandomString(12)
140+
hashed := encodePasswordToPBKDF2(password, salt, 101, 64)
141+
cI.Password = hashed
142+
cI.Salt = base64.StdEncoding.EncodeToString([]byte(salt))
143+
c.Clients[i] = cI
144+
}
145+
}
146+
data, err := json.MarshalIndent(c, "", " ")
147+
if err != nil {
148+
return err
149+
}
150+
return os.WriteFile(file, data, 0755)
151+
}
152+
80153
func DynamicSecManager(ctx context.Context) {
81154
defer close(DynSecChan)
82155
for {

mq/mq.go

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package mq
22

33
import (
44
"context"
5-
"encoding/json"
65
"time"
76

87
mqtt "github.com/eclipse/paho.mqtt.golang"
98
"github.com/gravitl/netmaker/logger"
10-
"github.com/gravitl/netmaker/logic"
119
"github.com/gravitl/netmaker/netclient/ncutils"
1210
"github.com/gravitl/netmaker/servercfg"
1311
)
@@ -24,68 +22,56 @@ var peer_force_send = 0
2422

2523
var mqclient mqtt.Client
2624

27-
func Configure() {
25+
func SetUpAdminClient() {
2826
opts := mqtt.NewClientOptions()
29-
broker, _ := servercfg.GetMessageQueueEndpoint()
30-
opts.AddBroker(broker)
31-
id := ncutils.MakeRandomString(23)
32-
opts.ClientID = id
33-
opts.SetUsername(mqDynSecAdmin)
34-
opts.SetPassword(adminPassword)
35-
opts.SetAutoReconnect(true)
36-
opts.SetConnectRetry(true)
37-
opts.SetConnectRetryInterval(time.Second << 2)
38-
opts.SetKeepAlive(time.Minute)
39-
opts.SetWriteTimeout(time.Minute)
40-
mqclient := mqtt.NewClient(opts)
27+
setMqOptions(mqAdminUserName, servercfg.GetMqAdminPassword(), opts)
28+
mqAdminClient = mqtt.NewClient(opts)
29+
opts.SetOnConnectHandler(func(client mqtt.Client) {
30+
if token := client.Subscribe(DynamicSecSubTopic, 0, mqtt.MessageHandler(watchDynSecTopic)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
31+
client.Disconnect(240)
32+
logger.Log(0, "Dynamic security client subscription failed")
33+
}
34+
35+
opts.SetOrderMatters(true)
36+
opts.SetResumeSubs(true)
37+
})
4138
tperiod := time.Now().Add(10 * time.Second)
4239
for {
43-
if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
44-
logger.Log(2, "unable to connect to broker, retrying ...")
40+
if token := mqAdminClient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
41+
logger.Log(2, "Admin: unable to connect to broker, retrying ...")
4542
if time.Now().After(tperiod) {
4643
if token.Error() == nil {
47-
logger.FatalLog("could not connect to broker, token timeout, exiting ...")
44+
logger.FatalLog("Admin: could not connect to broker, token timeout, exiting ...")
4845
} else {
49-
logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error())
46+
logger.FatalLog("Admin: could not connect to broker, exiting ...", token.Error().Error())
5047
}
5148
}
5249
} else {
5350
break
5451
}
5552
time.Sleep(2 * time.Second)
5653
}
57-
newAdminPassword := logic.GenKey()
58-
payload := MqDynsecPayload{
59-
Commands: []MqDynSecCmd{
60-
{
61-
Command: ModifyClientCmd,
62-
Username: mqDynSecAdmin,
63-
Password: newAdminPassword,
64-
},
65-
},
66-
}
67-
d, _ := json.Marshal(payload)
68-
if token := mqclient.Publish(DynamicSecPubTopic, 0, true, d); token.Error() != nil {
69-
logger.FatalLog("failed to modify admin password: ", token.Error().Error())
70-
}
71-
mqclient.Disconnect(2)
72-
adminPassword = newAdminPassword
54+
7355
}
7456

75-
// SetupMQTT creates a connection to broker and return client
76-
func SetupMQTT() {
77-
opts := mqtt.NewClientOptions()
57+
func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
7858
broker, _ := servercfg.GetMessageQueueEndpoint()
7959
opts.AddBroker(broker)
8060
id := ncutils.MakeRandomString(23)
8161
opts.ClientID = id
82-
opts.SetUsername(mqDynSecAdmin)
83-
opts.SetPassword(adminPassword)
62+
opts.SetUsername(user)
63+
opts.SetPassword(password)
8464
opts.SetAutoReconnect(true)
8565
opts.SetConnectRetry(true)
8666
opts.SetConnectRetryInterval(time.Second << 2)
8767
opts.SetKeepAlive(time.Minute)
8868
opts.SetWriteTimeout(time.Minute)
69+
}
70+
71+
// SetupMQTT creates a connection to broker and return client
72+
func SetupMQTT() {
73+
opts := mqtt.NewClientOptions()
74+
setMqOptions(mqNetmakerServerUserName, servercfg.GetMqAdminPassword(), opts)
8975
opts.SetOnConnectHandler(func(client mqtt.Client) {
9076
if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
9177
client.Disconnect(240)

0 commit comments

Comments
 (0)