Skip to content

Commit

Permalink
use context instead of channels (#40)
Browse files Browse the repository at this point in the history
* use context instead of channels

* fix typo
  • Loading branch information
tonicmuroq authored Jan 20, 2021
1 parent bd89b9c commit c7c7329
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 28 deletions.
8 changes: 6 additions & 2 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/projecteru2/agent/utils"
"github.com/projecteru2/agent/version"
"github.com/projecteru2/agent/watcher"
"github.com/sethvargo/go-signalcontext"
log "github.com/sirupsen/logrus"
cli "github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -53,16 +54,19 @@ func serve(c *cli.Context) error {
return selfmon.Monitor(config)
}

ctx, cancel := signalcontext.OnInterrupt()
defer cancel()

watcher.InitMonitor()
go watcher.LogMonitor.Serve()
go watcher.LogMonitor.Serve(ctx)

agent, err := engine.NewEngine(c.Context, config)
if err != nil {
return err
}

go api.Serve(config.API.Addr)
return agent.Run()
return agent.Run(ctx)
}

func main() {
Expand Down
13 changes: 7 additions & 6 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (h *Handler) log(w http.ResponseWriter, req *http.Request) {
}

// Serve start a api service
// blocks by http.ListenAndServe
// run this in a separated goroutine
func Serve(addr string) {
if addr == "" {
return
Expand All @@ -92,10 +94,9 @@ func Serve(addr string) {
http.Handle("/", restfulAPIServer)
http.Handle("/metrics", promhttp.Handler())
log.Infof("[apiServe] http api started %s", addr)
go func() {
err := http.ListenAndServe(addr, nil)
if err != nil {
log.Panicf("http api failed %s", err)
}
}()

err := http.ListenAndServe(addr, nil)
if err != nil {
log.Panicf("http api failed %s", err)
}
}
18 changes: 7 additions & 11 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package engine
import (
"context"
"os"
"os/signal"
"syscall"

engineapi "github.com/docker/docker/client"
"github.com/projecteru2/agent/common"
Expand Down Expand Up @@ -86,7 +84,9 @@ func NewEngine(ctx context.Context, config *types.Config) (*Engine, error) {
}

// Run will start agent
func (e *Engine) Run() error {
// blocks by ctx.Done()
// either call this in a separated goroutine, or used in main to block main goroutine
func (e *Engine) Run(ctx context.Context) error {
// load container
if err := e.load(); err != nil {
return err
Expand All @@ -96,21 +96,17 @@ func (e *Engine) Run() error {
go e.monitor(eventChan)

// start health check
go e.healthCheck()
go e.healthCheck(ctx)

// start node heartbeat
go e.heartbeat()
go e.heartbeat(ctx)

// not tell core this node is ready
// that's means keep node status
log.Info("[Engine] Node activated")

// wait for signal
var c = make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT)
select {
case s := <-c:
log.Infof("[Engine] Agent caught system signal %s, exiting", s)
case <-ctx.Done():
log.Info("[Engine] Agent caught system signal, exiting")
return nil
case err := <-errChan:
if err := e.crash(); err != nil {
Expand Down
12 changes: 9 additions & 3 deletions engine/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,17 @@ import (
log "github.com/sirupsen/logrus"
)

func (e *Engine) healthCheck() {
func (e *Engine) healthCheck(ctx context.Context) {
tick := time.NewTicker(time.Duration(e.config.HealthCheck.Interval) * time.Second)
defer tick.Stop()
for ; ; <-tick.C {
go e.checkAllContainers()

for {
select {
case <-tick.C:
go e.checkAllContainers()
case <-ctx.Done():
return
}
}
}

Expand Down
14 changes: 9 additions & 5 deletions engine/status_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import (

// heartbeat creates a new goroutine to report status every NodeStatusInterval seconds
// by default it will be 180s
func (e *Engine) heartbeat() {
func (e *Engine) heartbeat(ctx context.Context) {
tick := time.NewTicker(time.Duration(e.config.HeartbeatInterval) * time.Second)
// TODO this Stop is never reached
// fix this in another PR
defer tick.Stop()
for range tick.C {
go e.nodeStatusReport()

for {
select {
case <-tick.C:
go e.nodeStatusReport()
case <-ctx.Done():
return
}
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/projecteru2/core v0.0.0-20210120020951-6aab2987c576
github.com/prometheus/client_golang v1.8.0
github.com/sethvargo/go-signalcontext v0.1.0
github.com/shirou/gopsutil v3.20.11+incompatible
github.com/sirupsen/logrus v1.7.0
github.com/stretchr/objx v0.2.0 // indirect
Expand Down
6 changes: 5 additions & 1 deletion watcher/log.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package watcher

import (
"context"
"encoding/json"
"fmt"

Expand Down Expand Up @@ -28,10 +29,13 @@ func InitMonitor() {
}

// Serve start monitor
func (w *Watcher) Serve() {
func (w *Watcher) Serve(ctx context.Context) {
logrus.Info("[logServe] Log monitor started")
defer logrus.Info("[logServe] Log monitor stopped")
for {
select {
case <-ctx.Done():
return
case log := <-w.LogC:
if consumers, ok := w.consumer[log.Name]; ok {
data, err := json.Marshal(log)
Expand Down

0 comments on commit c7c7329

Please sign in to comment.