Skip to content

Commit

Permalink
Graceful shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Litvinov <jnashicq@gmail.com>
  • Loading branch information
Zensey committed Apr 7, 2024
1 parent 2eb4d16 commit 37dff57
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 3 deletions.
5 changes: 4 additions & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ func (c *Controller) startBackendControl() {
}
// now we have runtime (docker) running

hasUpdates := c.runner.CheckCurrentVersionAndUpgrades(false)
hasUpdates := false
if !c.waitForShutdownReady {
hasUpdates = c.runner.CheckCurrentVersionAndUpgrades(false)
}

if !c.runner.IsRunning() {
if c.model.Config.AutoUpgrade && hasUpdates {
Expand Down
4 changes: 2 additions & 2 deletions controller/native/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (c *Native_) CheckAndUpgradeNodeExe_(refreshVersionCache, doUpgrade bool) b
setUi()

hasUpdate := func() bool {
return (cfg.NodeExeVersion != cfg.NodeLatestTag) || cfg.NodeExeVersion == ""
return (cfg.NodeExeVersion != cfg.NodeExeLatestTag) || cfg.NodeExeVersion == ""
}

doRefresh := cfg.NodeExeVersion == "" || cfg.NodeExeLatestTag == "" ||
Expand All @@ -96,8 +96,8 @@ func (c *Native_) CheckAndUpgradeNodeExe_(refreshVersionCache, doUpgrade bool) b
return false
}
cfg.NodeExeLatestTag = release.TagName
cfg.RefreshLastUpgradeCheck()
defer func() {
cfg.RefreshLastUpgradeCheck()
cfg.Save()
}()

Expand Down
69 changes: 69 additions & 0 deletions controller/shutdown/shutdown.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package shutdown

import (
"context"
"encoding/json"
"fmt"

"github.com/advbet/sseclient"
"github.com/asaskevich/EventBus"
)

type x struct {
Type string `json:"type"`
Payload struct {
SessionsStats struct {
CountConsumers int `json:"count_consumers"`
} `json:"sessions_stats"`
} `json:"payload"`
}

type ShutdownController struct {
sse *sseclient.Client
cancel context.CancelFunc
bus EventBus.Bus
}

func NewShutdownController(bus EventBus.Bus) *ShutdownController {
return &ShutdownController{
bus: bus,
}
}

func (s *ShutdownController) eventHandler(event *sseclient.Event) error {
// log.Printf("event : %s : %s : %s", event.ID, event.Event, event.Data)
x := x{}

// set default not-zero value
x.Payload.SessionsStats.CountConsumers = -1
json.Unmarshal(event.Data, &x)
if x.Type == "state-change" {
fmt.Println(x)

if x.Payload.SessionsStats.CountConsumers == 0 {
s.bus.Publish("ready-to-shutdown")
}
}

return nil
}

func (s *ShutdownController) Start() {
addr := "http://localhost:4050/events/state"

if s.sse == nil {
c := sseclient.New(addr, "")
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
s.sse = c

go c.Start(ctx, s.eventHandler, sseclient.ReconnectOnError)
}
}

func (s *ShutdownController) Stop() {
if s.cancel != nil {
s.cancel()
s.sse = nil
}
}

0 comments on commit 37dff57

Please sign in to comment.