diff --git a/config/config.go b/config/config.go index 441896e..58608e7 100644 --- a/config/config.go +++ b/config/config.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/go-ini/ini" + "github.com/rs/zerolog" ) const ( @@ -33,6 +34,7 @@ type Section struct { } type Config struct { + log zerolog.Logger metadata map[string][]Parameter Sections map[string]*Section } @@ -168,9 +170,10 @@ func validate(value string, validators []Validator) error { return nil } -func NewConfig(metadata map[string][]Parameter) (*Config, error) { +func NewConfig(metadata map[string][]Parameter, logger zerolog.Logger) (*Config, error) { var conf Config conf.metadata = metadata + conf.log = logger // initialize config with default values conf.Sections = make(map[string]*Section) for sectionName, sectionMetadata := range conf.metadata { @@ -193,7 +196,6 @@ func (conf *Config) Parse(path string) error { if err != nil { return err } - //TODO: log loaded config file for sectionName, sectionMetadata := range conf.metadata { if sectionData, err := data.GetSection(sectionName); err == nil { for _, param := range sectionMetadata { @@ -202,6 +204,17 @@ func (conf *Config) Parse(path string) error { return fmt.Errorf("Failed to validate parameter %s. %s", param.Name, err.Error()) } conf.Sections[sectionName].Options[param.Name].value = paramData.Value() + conf.log.Debug(). + Str("section", sectionName). + Str("option", param.Name). + Str("value", paramData.Value()). + Msg("Using parsed configuration value.") + } else { + conf.log.Debug(). + Str("section", sectionName). + Str("option", param.Name). + Str("value", conf.Sections[sectionName].Options[param.Name].value). + Msg("Using default configuration value.") } } } diff --git a/main/main.go b/main/main.go index c32dab1..9c96ab7 100644 --- a/main/main.go +++ b/main/main.go @@ -1,22 +1,45 @@ package main import ( + "flag" "fmt" "os" "github.com/paramite/collectd-sensubility/config" "github.com/paramite/collectd-sensubility/sensu" + "github.com/rs/zerolog" ) const DEFAULT_CONFIG_PATH = "/etc/collectd-sensubility.conf" func main() { - metadata := config.GetAgentConfigMetadata() - cfg, err := config.NewConfig(metadata) + debug := flag.Bool("debug", false, "enables debugging logs") + verbose := flag.Bool("verbose", false, "enables debugging logs") + logpath := flag.String("log", "/var/log/collectd/sensubility.log", "path to log file") + flag.Parse() + + // set logging + logfile, err := os.OpenFile(*logpath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + defer logfile.Close() if err != nil { - panic(err.Error()) + fmt.Printf("Failed to open log file %s.\n", *logpath) + os.Exit(2) + } + zerolog.SetGlobalLevel(zerolog.WarnLevel) + if *verbose { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + } else if *debug { + zerolog.SetGlobalLevel(zerolog.DebugLevel) } + log := zerolog.New(logfile).With().Timestamp().Logger() + // spawn entities + metadata := config.GetAgentConfigMetadata() + cfg, err := config.NewConfig(metadata, log.With().Str("component", "config-parser").Logger()) + if err != nil { + log.Fatal().Err(err).Msg("Failed to parse config file.") + os.Exit(2) + } confPath := os.Getenv("COLLECTD_SENSUBILITY_CONFIG") if confPath == "" { confPath = DEFAULT_CONFIG_PATH @@ -25,24 +48,25 @@ func main() { if err != nil { panic(err.Error()) } - sensuConnector, err := sensu.NewConnector(cfg) + sensuConnector, err := sensu.NewConnector(cfg, log.With().Str("component", "sensu-connector").Logger()) if err != nil { - fmt.Println(err.Error()) + log.Fatal().Err(err).Msg("Failed to spawn RabbitMQ connector.") os.Exit(2) } defer sensuConnector.Disconnect() - sensuScheduler, err := sensu.NewScheduler(cfg) + sensuScheduler, err := sensu.NewScheduler(cfg, log.With().Str("component", "sensu-scheduler").Logger()) if err != nil { - fmt.Println(err.Error()) + log.Fatal().Err(err).Msg("Failed to spawn check scheduler.") os.Exit(2) } - sensuExecutor, err := sensu.NewExecutor(cfg) + sensuExecutor, err := sensu.NewExecutor(cfg, log.With().Str("component", "sensu-executor").Logger()) if err != nil { - fmt.Println(err.Error()) + log.Fatal().Err(err).Msg("Failed to spawn check executor.") os.Exit(2) } + defer sensuExecutor.Clean() requests := make(chan interface{}) results := make(chan interface{}) @@ -51,6 +75,7 @@ func main() { sensuConnector.Start(requests, results) sensuScheduler.Start(requests) + // spawn worker goroutines workers := cfg.Sections["sensu"].Options["worker_count"].GetInt() for i := 0; i < workers; i++ { @@ -61,12 +86,13 @@ func main() { case sensu.CheckRequest: res, err := sensuExecutor.Execute(req) if err != nil { - //TODO: log warning + reqstr := fmt.Sprintf("Request{name=%s, command=%s, issued=%d}", req.Name, req.Command, req.Issued) + log.Error().Err(err).Str("request", reqstr).Msg("Failed to execute requested command.") continue } results <- res default: - //TODO: log warning + log.Error().Err(err).Str("request", fmt.Sprintf("%v", req)).Msg("Failed to execute requested command.") } } }() diff --git a/sensu/connector.go b/sensu/connector.go index ab7b20b..40152b0 100644 --- a/sensu/connector.go +++ b/sensu/connector.go @@ -7,6 +7,7 @@ import ( "github.com/juju/errors" "github.com/paramite/collectd-sensubility/config" + "github.com/rs/zerolog" "github.com/streadway/amqp" ) @@ -35,6 +36,7 @@ type Connector struct { ClientName string ClientAddress string KeepaliveInterval int + log zerolog.Logger queueName string exchangeName string inConnection *amqp.Connection @@ -45,7 +47,7 @@ type Connector struct { consumer <-chan amqp.Delivery } -func NewConnector(cfg *config.Config) (*Connector, error) { +func NewConnector(cfg *config.Config, logger zerolog.Logger) (*Connector, error) { var connector Connector connector.Address = cfg.Sections["sensu"].Options["connection"].GetString() connector.Subscription = cfg.Sections["sensu"].Options["subscriptions"].GetStrings(",") @@ -53,6 +55,7 @@ func NewConnector(cfg *config.Config) (*Connector, error) { connector.ClientAddress = cfg.Sections["sensu"].Options["client_address"].GetString() connector.KeepaliveInterval = cfg.Sections["sensu"].Options["keepalive_interval"].GetInt() + connector.log = logger connector.exchangeName = fmt.Sprintf("client:%s", connector.ClientName) connector.queueName = fmt.Sprintf("%s-collectd-%d", connector.ClientName, time.Now().Unix()) @@ -165,7 +168,7 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{}) if err == nil { outchan <- request } else { - //TODO: log warning + self.log.Warn().Err(err).Bytes("request-body", req.Body).Msg("Failed to unmarshal request body.") } } }() @@ -177,7 +180,7 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{}) case Result: body, err := json.Marshal(result) if err != nil { - //TODO: log warning + self.log.Error().Err(err).Msg("Failed to marshal execution result.") continue } err = self.outChannel.Publish( @@ -194,10 +197,10 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{}) Priority: 0, // 0-9 }) if err != nil { - //TODO: log warning + self.log.Error().Err(err).Msg("Failed to publish execution result.") } default: - //TODO: log warning + self.log.Error().Str("type", fmt.Sprintf("%t", res)).Msg("Received execution result with invalid type.") } } }() @@ -213,7 +216,7 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{}) Timestamp: time.Now().Unix(), }) if err != nil { - //TODO: log warning + self.log.Error().Err(err).Msg("Failed to marshal keepalive body.") continue } err = self.outChannel.Publish( @@ -230,7 +233,7 @@ func (self *Connector) Start(outchan chan interface{}, inchan chan interface{}) Priority: 0, // 0-9 }) if err != nil { - //TODO: log warning + self.log.Error().Err(err).Msg("Failed to publish keepalive body.") } time.Sleep(time.Duration(self.KeepaliveInterval) * time.Second) } diff --git a/sensu/executor.go b/sensu/executor.go index 362d497..804f5c6 100644 --- a/sensu/executor.go +++ b/sensu/executor.go @@ -11,6 +11,7 @@ import ( "github.com/juju/errors" "github.com/paramite/collectd-sensubility/config" + "github.com/rs/zerolog" ) const ( @@ -38,15 +39,18 @@ type Executor struct { ClientName string TmpBaseDir string ShellPath string + log zerolog.Logger scriptCache map[string]string } -func NewExecutor(cfg *config.Config) (*Executor, error) { +func NewExecutor(cfg *config.Config, logger zerolog.Logger) (*Executor, error) { var executor Executor executor.ClientName = cfg.Sections["sensu"].Options["client_name"].GetString() executor.TmpBaseDir = cfg.Sections["sensu"].Options["tmp_base_dir"].GetString() executor.ShellPath = cfg.Sections["sensu"].Options["shell_path"].GetString() + executor.scriptCache = make(map[string]string) + executor.log = logger if _, err := os.Stat(executor.TmpBaseDir); os.IsNotExist(err) { err := os.MkdirAll(executor.TmpBaseDir, 0700) if err != nil { @@ -71,6 +75,7 @@ func (self *Executor) Execute(request CheckRequest) (Result, error) { } self.scriptCache[request.Command] = scriptFile.Name() scriptFile.Close() + self.log.Debug().Str("command", request.Command).Str("path", scriptFile.Name()).Msg("Created check script.") } //cmdParts := strings.Split(request.Command, " ") @@ -97,6 +102,7 @@ func (self *Executor) Execute(request CheckRequest) (Result, error) { } else if strings.TrimSpace(cmdErr) != "" { status = CHECK_WARN } + result := Result{ Client: self.ClientName, Check: CheckResult{ @@ -110,11 +116,11 @@ func (self *Executor) Execute(request CheckRequest) (Result, error) { }, } + self.log.Debug().Str("command", request.Command).Int("status", status).Msg("Executed check script.") return result, nil } -func (self *Executor) Clean() error { - //TODO: delete tmp dir - - return nil +func (self *Executor) Clean() { + os.Remove(self.TmpBaseDir) + self.log.Debug().Str("dir", self.TmpBaseDir).Msg("Removed temporary directory.") } diff --git a/sensu/scheduler.go b/sensu/scheduler.go index 64825a4..e04359c 100644 --- a/sensu/scheduler.go +++ b/sensu/scheduler.go @@ -7,6 +7,7 @@ import ( "github.com/juju/errors" "github.com/paramite/collectd-sensubility/config" + "github.com/rs/zerolog" ) type Check struct { @@ -24,10 +25,12 @@ type Check struct { type Scheduler struct { Checks map[string]Check + log zerolog.Logger } -func NewScheduler(cfg *config.Config) (*Scheduler, error) { +func NewScheduler(cfg *config.Config, logger zerolog.Logger) (*Scheduler, error) { var scheduler Scheduler + scheduler.log = logger err := json.Unmarshal(cfg.Sections["sensu"].Options["checks"].GetBytes(), &scheduler.Checks) if err != nil { return nil, errors.Trace(err) @@ -42,7 +45,7 @@ func (self *Scheduler) Start(outchan chan interface{}) { cases := []reflect.SelectCase{} for name, data := range self.Checks { if data.Interval < 1 { - //TODO: log warning + self.log.Warn().Str("check", name).Int("interval", data.Interval).Msg("Configuration contains invalid interval.") continue } //TODO: use rather time.NewTicker() to be able to ticker.Stop() all tickers in Scheduler.Stop() @@ -57,6 +60,7 @@ func (self *Scheduler) Start(outchan chan interface{}) { for { index, _, _ := reflect.Select(cases) // request check execution + self.log.Debug().Str("check", checks[index]).Msg("Requesting execution of check.") outchan <- CheckRequest{ Command: self.Checks[checks[index]].Command, Name: checks[index],