diff --git a/cmd/cluster/main.go b/cmd/cluster/main.go index c57edf8..07a4941 100644 --- a/cmd/cluster/main.go +++ b/cmd/cluster/main.go @@ -38,6 +38,12 @@ import ( var agent *cs.Agent +type mqttCmd struct { + TCP string + HTTP string + WS string +} + func pprof() { go func() { log.Info("listen pprof", "error", http.ListenAndServe(":6060", nil)) @@ -55,16 +61,20 @@ func realMain(ctx context.Context) error { var err error var confFile string var members string + var WSList []*listeners.Websocket + var TCPList []*listeners.TCP + var HTTPList []*listeners.HTTPStats cfg := config.New() + cmd := mqttCmd{} flag.StringVar(&confFile, "conf", "", "read the program parameters from the config file") flag.UintVar(&cfg.StorageWay, "storage-way", 3, "storage way options:0 memory, 1 bolt, 2 badger, 3 redis") flag.UintVar(&cfg.Auth.Way, "auth-way", 0, "authentication way options:0 anonymous, 1 username and password, 2 clientid") flag.UintVar(&cfg.Auth.Datasource, "auth-ds", 0, "authentication datasource options:0 free, 1 redis, 2 mysql, 3 postgresql, 4 http") flag.StringVar(&cfg.Auth.ConfPath, "auth-path", "", "config file path should correspond to the auth-datasource") - flag.StringVar(&cfg.Mqtt.TCP, "tcp", ":1883", "network address for mqtt tcp listener") - flag.StringVar(&cfg.Mqtt.WS, "ws", ":1882", "network address for mqtt websocket listener") - flag.StringVar(&cfg.Mqtt.HTTP, "http", ":8080", "network address for web info dashboard listener") + flag.StringVar(&cmd.TCP, "tcp", ":0", "network address for mqtt tcp listener") + flag.StringVar(&cmd.WS, "ws", ":0", "network address for mqtt websocket listener") + flag.StringVar(&cmd.HTTP, "http", ":0", "network address for web info dashboard listener") flag.StringVar(&cfg.Cluster.NodeName, "node-name", "", "node name must be unique in the cluster") flag.StringVar(&cfg.Cluster.BindAddr, "bind-ip", "127.0.0.1", "the ip used for discovery and communication between nodes. It is usually set to the intranet ip addr.") flag.IntVar(&cfg.Cluster.BindPort, "gossip-port", 7946, "this port is used to discover nodes in a cluster") @@ -122,29 +132,73 @@ func realMain(ctx context.Context) error { } // gen tls config - var listenerConfig *listeners.Config + var listenerTLSConfig *listeners.Config + var listenerConfig *listeners.Config + if tlsConfig, err := config.GenTlsConfig(cfg); err != nil { onError(err, "gen tls config") } else { if tlsConfig != nil { - listenerConfig = &listeners.Config{TLSConfig: tlsConfig} - } + listenerTLSConfig = &listeners.Config{TLSConfig: tlsConfig} + } else { + log.Info("TLS is not configured, all listeners will use unencrypted connections.") + } } - // add tcp listener - tcp := listeners.NewTCP("tcp", cfg.Mqtt.TCP, listenerConfig) - onError(server.AddListener(tcp), "add tcp listener") + // add cli tcp listener + if cmd.TCP != ":0" { + tcp := listeners.NewTCP("tcp", cmd.TCP, listenerTLSConfig) + onError(server.AddListener(tcp), "add tcp listener") + } + + // TCP Listeners from config file + TCPList = make([]*listeners.TCP, len(cfg.Mqtt.TCPListeners)) + for i := 0; i < len(cfg.Mqtt.TCPListeners); i++ { + if cfg.Mqtt.TCPListeners[i].Tls { + TCPList[i] = listeners.NewTCP(cfg.Mqtt.TCPListeners[i].Name, cfg.Mqtt.TCPListeners[i].Port, listenerTLSConfig) + } else { + TCPList[i] = listeners.NewTCP(cfg.Mqtt.TCPListeners[i].Name, cfg.Mqtt.TCPListeners[i].Port, listenerConfig) + } + onError(server.AddListener(TCPList[i]), "add tcp listener: " + cfg.Mqtt.TCPListeners[i].Name) + } // add websocket listener - ws := listeners.NewWebsocket("ws", cfg.Mqtt.WS, listenerConfig) - onError(server.AddListener(ws), "add websocket listener") + if cmd.WS != ":0" { + ws := listeners.NewWebsocket("ws", cmd.WS, listenerTLSConfig) + onError(server.AddListener(ws), "add websocket listener") + } + + // WS Listeners from config file + WSList = make([]*listeners.Websocket, len(cfg.Mqtt.WSListeners)) + for i := 0; i < len(cfg.Mqtt.WSListeners); i++ { + if cfg.Mqtt.WSListeners[i].Tls { + WSList[i] = listeners.NewWebsocket(cfg.Mqtt.WSListeners[i].Name, cfg.Mqtt.WSListeners[i].Port, listenerTLSConfig) + } else { + WSList[i] = listeners.NewWebsocket(cfg.Mqtt.WSListeners[i].Name, cfg.Mqtt.WSListeners[i].Port, listenerConfig) + } + onError(server.AddListener(WSList[i]), "add websocket listener: " + cfg.Mqtt.WSListeners[i].Name) + } // add http listener csHls := csRt.New(agent).GenHandlers() mqHls := mqttRt.New(server).GenHandlers() maps.Copy(csHls, mqHls) - http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, csHls) - onError(server.AddListener(http), "add http listener") + + if cmd.HTTP != ":0" { + http := listeners.NewHTTP("stats", cmd.HTTP, nil, csHls) + onError(server.AddListener(http), "add http listener") + } + + // HTTP Listeners from config file + HTTPList = make([]*listeners.HTTPStats, len(cfg.Mqtt.HTTPListeners)) + for i := 0; i < len(cfg.Mqtt.HTTPListeners); i++ { + if cfg.Mqtt.HTTPListeners[i].Tls { + HTTPList[i] = listeners.NewHTTP(cfg.Mqtt.HTTPListeners[i].Name, cfg.Mqtt.HTTPListeners[i].Port, listenerTLSConfig, csHls) + } else { + HTTPList[i] = listeners.NewHTTP(cfg.Mqtt.HTTPListeners[i].Name, cfg.Mqtt.HTTPListeners[i].Port, listenerConfig, csHls) + } + onError(server.AddListener(HTTPList[i]), "add http listener: " + cfg.Mqtt.HTTPListeners[i].Name) + } errCh := make(chan error, 1) // start server diff --git a/cmd/config/node1.yml b/cmd/config/node1.yml index 9ea66d1..373ed1d 100644 --- a/cmd/config/node1.yml +++ b/cmd/config/node1.yml @@ -29,9 +29,18 @@ cluster: inout-pool-nonblocking: false #Pool size is unlimited, when inout-pool-nonblocking is true, inbound-pool-size and outbound-pool-size is inoperative. mqtt: - tcp: :1883 - ws: :1882 - http: :8080 + TCPListeners: + - name: "TCP TLS Listener" + port: ":1883" + tls: true + WSListeners: + - name: "Websocket TLS Listener" + port: ":1882" + tls: true + HTTPListeners: + - name: "HTTPS Listener" + port: ":8080" + tls: true tls: ca-cert: #CA root certificate file path. Not empty enable bidirectional authentication. server-cert: #Server certificate file path diff --git a/cmd/config/node2.yml b/cmd/config/node2.yml index ab0d088..48e412e 100644 --- a/cmd/config/node2.yml +++ b/cmd/config/node2.yml @@ -29,9 +29,18 @@ cluster: inout-pool-nonblocking: false #Pool size is unlimited, when inout-pool-nonblocking is true, inbound-pool-size and outbound-pool-size is inoperative. mqtt: - tcp: :1885 - ws: :1886 - http: :8081 + TCPListeners: + - name: "TCP TLS Listener" + port: ":1885" + tls: true + WSListeners: + - name: "Websocket TLS Listener" + port: ":1886" + tls: true + HTTPListeners: + - name: "HTTPS Listener" + port: ":8081" + tls: true tls: ca-cert: #CA root certificate file path. Not empty enable bidirectional authentication. server-cert: #Server certificate file path @@ -77,4 +86,4 @@ log: max-age: 30 #MaxAge is the maximum number of days to retain old log files based on the timestamp encoded in their filename max-backups: 10 #MaxBackups is the maximum number of old log files to retain compress: true #Compress determines if the rotated log files should be compressed using gzip - level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8. \ No newline at end of file + level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8. diff --git a/cmd/config/node3.yml b/cmd/config/node3.yml index 891bd74..e668247 100644 --- a/cmd/config/node3.yml +++ b/cmd/config/node3.yml @@ -29,9 +29,18 @@ cluster: inout-pool-nonblocking: false #Pool size is unlimited, when inout-pool-nonblocking is true, inbound-pool-size and outbound-pool-size is inoperative. mqtt: - tcp: :1887 - ws: :1888 - http: :8082 + TCPListeners: + - name: "TCP TLS Listener" + port: ":1887" + tls: true + WSListeners: + - name: "Websocket TLS Listener" + port: ":1888" + tls: true + HTTPListeners: + - name: "HTTPS Listener" + port: ":8082" + tls: true tls: ca-cert: #CA root certificate file path. Not empty enable bidirectional authentication. server-cert: #Server certificate file path @@ -77,4 +86,4 @@ log: max-age: 30 #MaxAge is the maximum number of days to retain old log files based on the timestamp encoded in their filename max-backups: 10 #MaxBackups is the maximum number of old log files to retain compress: true #Compress determines if the rotated log files should be compressed using gzip - level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8. \ No newline at end of file + level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8. diff --git a/cmd/config/single.yml b/cmd/config/single.yml index 96b279e..95a944d 100644 --- a/cmd/config/single.yml +++ b/cmd/config/single.yml @@ -10,9 +10,21 @@ auth: conf-path: ./config/auth-redis.yml #The config file path should correspond to the auth-datasource mqtt: - tcp: :1883 - ws: :1882 - http: :8080 + TCPListeners: + - name: "TCP TLS Listener" + port: ":1883" + tls: true #Use TLS if configured, otherwise fall back to an unencrypted connection. + WSListeners: + - name: "Websocket TLS Listener" + port: ":1882" + tls: true + - name: "Websocket Insecure Listener" + port: ":1884" + tls: false #Don't attempt to use TLS + HTTPListeners: + - name: "HTTP TLS Listener" + port: ":8080" + tls: true tls: ca-cert: #CA root certificate file path. Not empty enable bidirectional authentication. server-cert: #Server certificate file path @@ -58,4 +70,4 @@ log: max-age: 30 #MaxAge is the maximum number of days to retain old log files based on the timestamp encoded in their filename max-backups: 10 #MaxBackups is the maximum number of old log files to retain compress: true #Compress determines if the rotated log files should be compressed using gzip - level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8. \ No newline at end of file + level: 0 #Log level, with supported values LevelDebug: -4, LevelInfo: 0, LevelWarn: 4, and LevelError: 8. diff --git a/cmd/single/main.go b/cmd/single/main.go index 51d0f9f..b366705 100644 --- a/cmd/single/main.go +++ b/cmd/single/main.go @@ -33,6 +33,12 @@ import ( "go.etcd.io/bbolt" ) +type mqttCmd struct { + TCP string + HTTP string + WS string +} + func pprof() { go func() { log.Info("listen pprof", "error", http.ListenAndServe(":6060", nil)) @@ -49,16 +55,20 @@ func main() { func realMain(ctx context.Context) error { var err error var confFile string + var WSList []*listeners.Websocket + var TCPList []*listeners.TCP + var HTTPList []*listeners.HTTPStats cfg := config.New() + cmd := mqttCmd{} flag.StringVar(&confFile, "conf", "", "read the program parameters from the config file") flag.UintVar(&cfg.StorageWay, "storage-way", 1, "storage way optional items:0 memory, 1 bolt, 2 badger, 3 redis") flag.UintVar(&cfg.Auth.Way, "auth-way", 0, "authentication way optional items:0 anonymous, 1 username and password, 2 clientid") flag.UintVar(&cfg.Auth.Datasource, "auth-ds", 0, "authentication datasource optional items:0 free, 1 redis, 2 mysql, 3 postgresql, 4 http") flag.StringVar(&cfg.Auth.ConfPath, "auth-path", "", "config file path should correspond to the auth-datasource") - flag.StringVar(&cfg.Mqtt.TCP, "tcp", ":1883", "network address for Mqtt TCP listener") - flag.StringVar(&cfg.Mqtt.WS, "ws", ":1882", "network address for Mqtt Websocket listener") - flag.StringVar(&cfg.Mqtt.HTTP, "http", ":8080", "network address for web info dashboard listener") + flag.StringVar(&cmd.TCP, "tcp", ":0", "network address for Mqtt TCP listener") + flag.StringVar(&cmd.WS, "ws", ":0", "network address for Mqtt Websocket listener") + flag.StringVar(&cmd.HTTP, "http", ":0", "network address for web info dashboard listener") flag.BoolVar(&cfg.Log.Enable, "log-enable", true, "log enabled or not") flag.StringVar(&cfg.Log.Filename, "log-file", "./logs/comqtt.log", "log filename") //parse arguments @@ -90,26 +100,69 @@ func realMain(ctx context.Context) error { initBridge(server, cfg) // gen tls config - var listenerConfig *listeners.Config + var listenerTLSConfig *listeners.Config + var listenerConfig *listeners.Config + if tlsConfig, err := config.GenTlsConfig(cfg); err != nil { onError(err, "") } else { if tlsConfig != nil { - listenerConfig = &listeners.Config{TLSConfig: tlsConfig} - } + listenerTLSConfig = &listeners.Config{TLSConfig: tlsConfig} + } else { + log.Info("TLS is not configured, all listeners will use unencrypted connections.") + } } - // add tcp listener - tcp := listeners.NewTCP("tcp", cfg.Mqtt.TCP, listenerConfig) - onError(server.AddListener(tcp), "add tcp listener") + // add cli tcp listener + if cmd.TCP != ":0" { + tcp := listeners.NewTCP("tcp", cmd.TCP, listenerTLSConfig) + onError(server.AddListener(tcp), "add tcp listener") + } + + // TCP Listeners from config file + TCPList = make([]*listeners.TCP, len(cfg.Mqtt.TCPListeners)) + for i := 0; i < len(cfg.Mqtt.TCPListeners); i++ { + if cfg.Mqtt.TCPListeners[i].Tls { + TCPList[i] = listeners.NewTCP(cfg.Mqtt.TCPListeners[i].Name, cfg.Mqtt.TCPListeners[i].Port, listenerTLSConfig) + } else { + TCPList[i] = listeners.NewTCP(cfg.Mqtt.TCPListeners[i].Name, cfg.Mqtt.TCPListeners[i].Port, listenerConfig) + } + onError(server.AddListener(TCPList[i]), "add tcp listener: " + cfg.Mqtt.TCPListeners[i].Name) + } + + // add cli websocket listener + if cmd.WS != ":0" { + ws := listeners.NewWebsocket("ws", cmd.WS, listenerTLSConfig) + onError(server.AddListener(ws), "add websocket listener") + } + + // WS Listeners from config file + WSList = make([]*listeners.Websocket, len(cfg.Mqtt.WSListeners)) + for i := 0; i < len(cfg.Mqtt.WSListeners); i++ { + if cfg.Mqtt.WSListeners[i].Tls { + WSList[i] = listeners.NewWebsocket(cfg.Mqtt.WSListeners[i].Name, cfg.Mqtt.WSListeners[i].Port, listenerTLSConfig) + } else { + WSList[i] = listeners.NewWebsocket(cfg.Mqtt.WSListeners[i].Name, cfg.Mqtt.WSListeners[i].Port, listenerConfig) + } + onError(server.AddListener(WSList[i]), "add websocket listener: " + cfg.Mqtt.WSListeners[i].Name) + } - // add websocket listener - ws := listeners.NewWebsocket("ws", cfg.Mqtt.WS, listenerConfig) - onError(server.AddListener(ws), "add websocket listener") + // add cli http listener + if cmd.HTTP != ":0" { + http := listeners.NewHTTP("stats", cmd.HTTP, nil, rest.New(server).GenHandlers()) + onError(server.AddListener(http), "add http listener") + } - // add http listener - http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, rest.New(server).GenHandlers()) - onError(server.AddListener(http), "add http listener") + // HTTP Listeners from config file + HTTPList = make([]*listeners.HTTPStats, len(cfg.Mqtt.HTTPListeners)) + for i := 0; i < len(cfg.Mqtt.HTTPListeners); i++ { + if cfg.Mqtt.HTTPListeners[i].Tls { + HTTPList[i] = listeners.NewHTTP(cfg.Mqtt.HTTPListeners[i].Name, cfg.Mqtt.HTTPListeners[i].Port, listenerTLSConfig, rest.New(server).GenHandlers()) + } else { + HTTPList[i] = listeners.NewHTTP(cfg.Mqtt.HTTPListeners[i].Name, cfg.Mqtt.HTTPListeners[i].Port, listenerConfig, rest.New(server).GenHandlers()) + } + onError(server.AddListener(HTTPList[i]), "add http listener: " + cfg.Mqtt.HTTPListeners[i].Name) + } errCh := make(chan error, 1) // start server diff --git a/config/config.go b/config/config.go index 6f34718..4d20945 100644 --- a/config/config.go +++ b/config/config.go @@ -102,11 +102,17 @@ type auth struct { } type mqtt struct { - TCP string `yaml:"tcp"` - WS string `yaml:"ws"` - HTTP string `yaml:"http"` - Tls tls `yaml:"tls"` - Options comqtt.Options `yaml:"options"` + TCPListeners []listenerOptions `yaml:"TCPListeners"` + WSListeners []listenerOptions `yaml:"WSListeners"` + HTTPListeners []listenerOptions `yaml:"HTTPListeners"` + Tls tls `yaml:"tls"` + Options comqtt.Options `yaml:"options"` +} + +type listenerOptions struct { + Name string `yaml:"name"` + Port string `yaml:"port"` + Tls bool `yaml:"tls"` } type tls struct {