Skip to content

Commit

Permalink
improve several things
Browse files Browse the repository at this point in the history
- add sphinx
- support tls
- performance improvements
- add authenticator interface (future use)
  • Loading branch information
chobie committed Sep 29, 2014
1 parent 4ff9e67 commit 1855d09
Show file tree
Hide file tree
Showing 38 changed files with 1,360 additions and 535 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Momonga MQTT

# About

Momonga is a Golang MQTT library
Momonga is a Golang MQTT library to make MQTT client and Server easily.

This project has been started as my hobby project, to learn MQTT3.1.1 behaviour.
This might contains some unreliable codes. Yes, contributions are very welcome.
Expand Down
2 changes: 1 addition & 1 deletion TODO
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ User Name MUST be a UTF-8 encoded string as defined in Section ‎1.5.3.
[MQTT-3.1.4-1]
The Server MUST validate that the CONNECT Packet conforms to section 3.1 and close the Network Connection without sending a CONNACK if it does not conform.

[MQTT-3.1.4-2]
[x][MQTT-3.1.4-2]
If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client.

[MQTT-3.1.4-3]
Expand Down
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ func (self *Client) Connect() error {
}

if len(self.Option.UserName) > 0 {
msg.UserName = self.Option.UserName
msg.UserName = []byte(self.Option.UserName)
}

if len(self.Option.Password) > 0 {
msg.Password = self.Option.Password
msg.Password = []byte(self.Option.Password)
}

self.Connection.SetState(STATE_CONNECTING)
Expand Down
25 changes: 21 additions & 4 deletions common/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,42 @@ func (e *ConnectionResetError) Error() string {
type Connection interface {
//WriteMessage(request mqtt.Message) error
WriteMessageQueue(request mqtt.Message)

Close() error

SetState(State)

GetState() State

ResetState()

ReadMessage() (mqtt.Message, error)

IsAlived() bool

SetWillMessage(mqtt.WillMessage)

GetWillMessage() *mqtt.WillMessage

HasWillMessage() bool

GetOutGoingTable() *util.MessageTable

GetSubscribedTopics() map[string]*SubscribeSet

AppendSubscribedTopic(string, *SubscribeSet)

RemoveSubscribedTopic(string)

SetKeepaliveInterval(int)

GetId() string
GetGuid() util.Guid
SetGuid(util.Guid)

GetRealId() string

SetId(string)
DisableClearSession()
ShouldClearSession() bool

DisableCleanSession()

ShouldCleanSession() bool
}
33 changes: 15 additions & 18 deletions common/myconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"bufio"
"fmt"
codec "github.com/chobie/momonga/encoding/mqtt"
"github.com/chobie/momonga/flags"
log "github.com/chobie/momonga/logger"
"github.com/chobie/momonga/util"
"io"
Expand Down Expand Up @@ -45,9 +44,9 @@ type MyConnection struct {
Reader *bufio.Reader
Writer *bufio.Writer
KeepLoop bool
guid util.Guid
balancer *util.Balancer
logger log.Logger
MaxMessageSize int
}

func (self *MyConnection) SetOpaque(opaque interface{}) {
Expand All @@ -60,6 +59,7 @@ func (self *MyConnection) GetOpaque() interface{} {

type MyConfig struct {
QueueSize int
MaxMessageSize int
OfflineQueueSize int
Keepalive int
WritePerSec int
Expand All @@ -68,11 +68,16 @@ type MyConfig struct {

var defaultConfig = MyConfig{
QueueSize: 8192,
MaxMessageSize: 8192,
OfflineQueueSize: 1024,
Keepalive: 0,
WritePerSec: 0,
}

func GetDefaultMyConfig() *MyConfig {
return &defaultConfig
}

// TODO: どっかで綺麗にしたい
func NewMyConnection(conf *MyConfig) *MyConnection {
if conf == nil {
Expand All @@ -94,6 +99,7 @@ func NewMyConnection(conf *MyConfig) *MyConnection {
Keepalive: conf.Keepalive,
State: STATE_INIT,
Closed: make(chan bool),
MaxMessageSize: 8192,
}

c.logger = log.Global
Expand All @@ -106,6 +112,9 @@ func NewMyConnection(conf *MyConfig) *MyConnection {
PerSec: conf.WritePerSec,
}
}
if conf.MaxMessageSize > 0 {
c.MaxMessageSize = conf.MaxMessageSize
}

c.Events["connected"] = func() {
c.State = STATE_CONNECTED
Expand Down Expand Up @@ -531,14 +540,6 @@ func (self *MyConnection) ReadMessage() (codec.Message, error) {
return self.ParseMessage()
}

func (self *MyConnection) GetGuid() util.Guid {
return self.guid
}

func (self *MyConnection) SetGuid(id util.Guid) {
self.guid = id
}

func (self *MyConnection) ParseMessage() (codec.Message, error) {
// self.Mutex.RLock()
// defer self.Mutex.RUnlock()
Expand All @@ -552,7 +553,7 @@ func (self *MyConnection) ParseMessage() (codec.Message, error) {
if self.Reader == nil {
panic("reader is null")
}
message, err := codec.ParseMessage(self.Reader, 8192)
message, err := codec.ParseMessage(self.Reader, self.MaxMessageSize)
if err != nil {
self.logger.Debug(">>> Message: %s\n", err)
if v, ok := self.Events["error"].(func(error)); ok {
Expand Down Expand Up @@ -751,22 +752,18 @@ func (self *MyConnection) GetRealId() string {
}

func (self *MyConnection) GetId() string {
if flags.Mflags["experimental.newid"] {
return fmt.Sprintf("%s:%d", self.Id, self.guid)
} else {
return self.Id
}
return self.Id
}

func (self *MyConnection) SetKeepaliveInterval(interval int) {
self.Keepalive = interval
}

func (self *MyConnection) DisableClearSession() {
func (self *MyConnection) DisableCleanSession() {
self.CleanSession = false
}

func (self *MyConnection) ShouldClearSession() bool {
func (self *MyConnection) ShouldCleanSession() bool {
return self.CleanSession
}

Expand Down
29 changes: 27 additions & 2 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ log_file = "stdout"
log_level = "debug"
pid_file = ""

user = "momonga"

bind_address = "localhost"

port = 1883
max_connections = 100

max_connections = 1000

# unix socket path (default disabled)
socket = ""
connection_timeout = 10

Expand All @@ -17,9 +23,28 @@ tls_port = 8883
cafile = ""
certfile = ""
keyfile = ""

httpport = 9000
websocket_mount = "/mqtt"

# provide http debug features (see server/http.go)
http_debug = true

[engine]
enable_permission = false
queue_size = 8192
lock_pool_size = 64
enable_sys = true
acceptor_count = "cpu"
lock_pool_size = 64
fanout_worker_count = "cpu"

max_inflight_messages = 10000
max_queued_messages = 10000
retry_interval = 20

# max is 268435455 bytes
message_size_limit = 8192
allow_anonymous = true

[engine.authenticator]
type = "empty"
81 changes: 60 additions & 21 deletions configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,40 @@ type Config struct {
}

type Engine struct {
QueueSize int `toml:queue_size`
AcceptorCount string `toml:acceptor_count`
LockPoolSize int `toml:lock_pool_size`
EnableSys bool `toml:"enable_sys"`
FanoutWorkerCount string `toml:fanout_worker_count`
QueueSize int `toml:"queue_size"`
AcceptorCount string `toml:"acceptor_count"`
LockPoolSize int `toml:"lock_pool_size"`
EnableSys bool `toml:"enable_sys"`
FanoutWorkerCount string `toml:"fanout_worker_count"`
AllowAnonymous bool `toml:"allow_anonymous"`
Authenticators []Authenticator `toml:"authenticator"`
EnablePermission bool `toml:"enable_permission"`
}

type Authenticator struct {
Type string `toml:"type"`
}

type Server struct {
LogFile string `toml:"log_file"`
LogLevel string `toml:"log_level"`
PidFile string `toml:"pid_file"`
BindAddress string `toml:"bind_address"`
Port int `toml:"port"`
Socket string `toml:"socket"`
HttpPort int `toml:"http_port"`
WebSocketMount string `toml:"websocket_mount"`
User string `toml:"user"`
LogFile string `toml:"log_file"`
LogLevel string `toml:"log_level"`
PidFile string `toml:"pid_file"`
BindAddress string `toml:"bind_address"`
Port int `toml:"port"`
EnableTls bool `toml:"enable_tls"`
TlsPort int `toml:"tls_port"`
Keyfile string `toml:"keyfile"`
Cafile string `toml:"cafile"`
Certfile string `toml:"certfile"`
Socket string `toml:"socket"`
HttpPort int `toml:"http_port"`
WebSocketMount string `toml:"websocket_mount"`
HttpDebug bool `toml:"http_debug"`
MaxInflightMessages int `toml:"max_inflight_messages"`
MaxQueuedMessages int `toml:"max_queued_messages"`
RetryInterval int `toml:"retry_interval"`
MessageSizeLimit int `toml:"message_size_limit"`
}

func (self *Config) GetQueueSize() int {
Expand Down Expand Up @@ -76,6 +94,13 @@ func (self *Config) GetListenAddress() string {
return fmt.Sprintf("%s:%d", self.Server.BindAddress, self.Server.Port)
}

func (self *Config) GetTlsListenAddress() string {
if self.Server.TlsPort <= 0 {
return ""
}
return fmt.Sprintf("%s:%d", self.Server.BindAddress, self.Server.TlsPort)
}

func (self *Config) GetSSLListenAddress() string {
if self.Server.Port <= 0 {
return ""
Expand All @@ -87,6 +112,10 @@ func (self *Config) GetSocketAddress() string {
return self.Server.Socket
}

func (self *Config) GetAuthenticators() []Authenticator {
return self.Engine.Authenticators
}

func DefaultConfiguration() *Config {
return &Config{
Engine: Engine{
Expand All @@ -95,16 +124,26 @@ func DefaultConfiguration() *Config {
FanoutWorkerCount: "cpu",
LockPoolSize: 64,
EnableSys: true,
AllowAnonymous: true,
EnablePermission: false,
},
Server: Server{
LogFile: "stdout",
LogLevel: "debug",
PidFile: "",
BindAddress: "localhost",
Port: 1883,
Socket: "",
HttpPort: 9000,
WebSocketMount: "/mqtt",
User: "momonga",
LogFile: "stdout",
LogLevel: "debug",
PidFile: "",
BindAddress: "localhost",
Port: 1883,
Socket: "",
HttpPort: 9000,
WebSocketMount: "/mqtt",
HttpDebug: true,
MaxInflightMessages: 10000,
MaxQueuedMessages: 10000,
RetryInterval: 20,
MessageSizeLimit: 8192,
EnableTls: false,
TlsPort: 8883,
},
}
}
Expand Down
Loading

0 comments on commit 1855d09

Please sign in to comment.