From 37dff57cd1d07bc4118cb48a66df346fc2691185 Mon Sep 17 00:00:00 2001 From: Anton Litvinov Date: Thu, 8 Feb 2024 15:17:34 +0400 Subject: [PATCH] Graceful shutdown Signed-off-by: Anton Litvinov --- controller/controller.go | 5 ++- controller/native/install.go | 4 +- controller/shutdown/shutdown.go | 69 +++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 controller/shutdown/shutdown.go diff --git a/controller/controller.go b/controller/controller.go index 10578d2..d0c4ff3 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -119,7 +119,10 @@ func (c *Controller) startBackendControl() { } // now we have runtime (docker) running - hasUpdates := c.runner.CheckCurrentVersionAndUpgrades(false) + hasUpdates := false + if !c.waitForShutdownReady { + hasUpdates = c.runner.CheckCurrentVersionAndUpgrades(false) + } if !c.runner.IsRunning() { if c.model.Config.AutoUpgrade && hasUpdates { diff --git a/controller/native/install.go b/controller/native/install.go index 112f493..879897a 100644 --- a/controller/native/install.go +++ b/controller/native/install.go @@ -77,7 +77,7 @@ func (c *Native_) CheckAndUpgradeNodeExe_(refreshVersionCache, doUpgrade bool) b setUi() hasUpdate := func() bool { - return (cfg.NodeExeVersion != cfg.NodeLatestTag) || cfg.NodeExeVersion == "" + return (cfg.NodeExeVersion != cfg.NodeExeLatestTag) || cfg.NodeExeVersion == "" } doRefresh := cfg.NodeExeVersion == "" || cfg.NodeExeLatestTag == "" || @@ -96,8 +96,8 @@ func (c *Native_) CheckAndUpgradeNodeExe_(refreshVersionCache, doUpgrade bool) b return false } cfg.NodeExeLatestTag = release.TagName + cfg.RefreshLastUpgradeCheck() defer func() { - cfg.RefreshLastUpgradeCheck() cfg.Save() }() diff --git a/controller/shutdown/shutdown.go b/controller/shutdown/shutdown.go new file mode 100644 index 0000000..666333b --- /dev/null +++ b/controller/shutdown/shutdown.go @@ -0,0 +1,69 @@ +package shutdown + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/advbet/sseclient" + "github.com/asaskevich/EventBus" +) + +type x struct { + Type string `json:"type"` + Payload struct { + SessionsStats struct { + CountConsumers int `json:"count_consumers"` + } `json:"sessions_stats"` + } `json:"payload"` +} + +type ShutdownController struct { + sse *sseclient.Client + cancel context.CancelFunc + bus EventBus.Bus +} + +func NewShutdownController(bus EventBus.Bus) *ShutdownController { + return &ShutdownController{ + bus: bus, + } +} + +func (s *ShutdownController) eventHandler(event *sseclient.Event) error { + // log.Printf("event : %s : %s : %s", event.ID, event.Event, event.Data) + x := x{} + + // set default not-zero value + x.Payload.SessionsStats.CountConsumers = -1 + json.Unmarshal(event.Data, &x) + if x.Type == "state-change" { + fmt.Println(x) + + if x.Payload.SessionsStats.CountConsumers == 0 { + s.bus.Publish("ready-to-shutdown") + } + } + + return nil +} + +func (s *ShutdownController) Start() { + addr := "http://localhost:4050/events/state" + + if s.sse == nil { + c := sseclient.New(addr, "") + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + s.sse = c + + go c.Start(ctx, s.eventHandler, sseclient.ReconnectOnError) + } +} + +func (s *ShutdownController) Stop() { + if s.cancel != nil { + s.cancel() + s.sse = nil + } +}