From ac30f618a3b621f4bd38fd3d8050f89618c8e8bd Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Tue, 16 Dec 2014 17:01:44 -0800 Subject: [PATCH 01/10] server: centralise shutdown through Monitor The Server has a global stop channel which is used both internally (by Monitor) and externally (by the Stop method) to shut down the server. This is bad; invoking the method simultaneously by multiple goroutines is not safe (and as seen in #1044 can cause panics due to a doubly-closed channel). This change centralises the shutdown procedure through the Monitor, so that when an external user of the Server wants to shut it down, it triggers an error to propagate up from the monitor. Hence there is only a single path in which the stopchannel (which terminates all other Server goroutines) can be called. --- heart/monitor.go | 9 +++++---- server/server.go | 48 +++++++++++++++++++++++++++++++----------------- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/heart/monitor.go b/heart/monitor.go index 87ffb30a7..44c44ea30 100644 --- a/heart/monitor.go +++ b/heart/monitor.go @@ -21,6 +21,8 @@ import ( "github.com/coreos/fleet/log" ) +var ErrShutdown = errors.New("monitor told to shut down") + func NewMonitor(ttl time.Duration) *Monitor { return &Monitor{ttl, ttl / 2} } @@ -32,13 +34,12 @@ type Monitor struct { // Monitor ensures a Heart is still beating until a channel is closed, returning // an error if the heartbeats fail. -func (m *Monitor) Monitor(hrt Heart, stop chan bool) error { +func (m *Monitor) Monitor(hrt Heart, sdc <-chan bool) error { ticker := time.Tick(m.ival) for { select { - case <-stop: - log.Debug("Monitor exiting due to stop signal") - return nil + case <-sdc: + return ErrShutdown case <-ticker: if _, err := m.check(hrt); err != nil { return err diff --git a/server/server.go b/server/server.go index 565d202ff..004e377ac 100644 --- a/server/server.go +++ b/server/server.go @@ -17,6 +17,7 @@ package server import ( "encoding/json" "errors" + "fmt" "net/http" "time" @@ -58,7 +59,8 @@ type Server struct { engineReconcileInterval time.Duration - stop chan bool + killc chan bool // used to signal monitor to shutdown server + stopc chan bool // used to terminate all other goroutines } func New(cfg config.Config) (*Server, error) { @@ -133,7 +135,8 @@ func New(cfg config.Config) (*Server, error) { hrt: hrt, mon: mon, api: apiServer, - stop: nil, + killc: make(chan struct{}), + stopc: nil, engineReconcileInterval: eIval, disableEngine: cfg.DisableEngine, } @@ -172,38 +175,49 @@ func (s *Server) Run() { log.Infof("Starting server components") - s.stop = make(chan bool) + s.stopc = make(chan bool) go s.Monitor() - go s.api.Available(s.stop) - go s.mach.PeriodicRefresh(machineStateRefreshInterval, s.stop) - go s.agent.Heartbeat(s.stop) - go s.aReconciler.Run(s.agent, s.stop) + go s.api.Available(s.stopc) + go s.mach.PeriodicRefresh(machineStateRefreshInterval, s.stopc) + go s.agent.Heartbeat(s.stopc) + go s.aReconciler.Run(s.agent, s.stopc) if s.disableEngine { log.Info("Not starting engine; disable-engine is set") } else { - go s.engine.Run(s.engineReconcileInterval, s.stop) + go s.engine.Run(s.engineReconcileInterval, s.stopc) } beatchan := make(chan *unit.UnitStateHeartbeat) - go s.usGen.Run(beatchan, s.stop) - go s.usPub.Run(beatchan, s.stop) + go s.usGen.Run(beatchan, s.stopc) + go s.usPub.Run(beatchan, s.stopc) } -// Monitor tracks the health of the Server. If the Server is ever deemed -// unhealthy, the Server is restarted. +// Monitor tracks the health of the Server and coordinates its shutdown. If the +// Server is ever deemed unhealthy, the Server is restarted. func (s *Server) Monitor() { - err := s.mon.Monitor(s.hrt, s.stop) - if err != nil { + err := s.mon.Monitor(s.hrt, s.killc) + s.stop() + switch { + case err == heart.ErrShutdown: + log.Infof("Server monitor triggered: %v", err) + case err != nil: log.Errorf("Server monitor triggered: %v", err) - - s.Stop() + // Restart the server s.Run() + default: + panic(fmt.Errorf("unexpected err from Monitor: %v", err)) } } +// stop is used by Monitor to terminate all other server goroutines +func (s *Server) stop() { + close(s.stopc) +} + +// Stop is used to gracefully terminate the server by triggering the Monitor func (s *Server) Stop() { - close(s.stop) + close(s.killc) } func (s *Server) Purge() { From 2af2610ff05fdd8fac172b27923aca532869e81c Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Tue, 16 Dec 2014 17:04:55 -0800 Subject: [PATCH 02/10] fleetd: protect access to global Server instance There are three different paths in the main fleetd goroutine that can access the global `srv` Server - reconfigurations, shutdowns and statedumps. Right now there's nothing preventing racy access to this instance, so introduce a mutex to protect it. One potential issue with this is that it means that a reconfigure or state dump can "block" a shutdown, but IMHO if this occurs it will expose behaviour that is broken and needs to be fixed anyway. --- fleetd/fleetd.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/fleetd/fleetd.go b/fleetd/fleetd.go index 9ff5001b7..5518d79ac 100755 --- a/fleetd/fleetd.go +++ b/fleetd/fleetd.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "os/signal" + "sync" "syscall" "github.com/coreos/fleet/Godeps/_workspace/src/github.com/rakyll/globalconf" @@ -104,9 +105,14 @@ func main() { } srv.Run() + srvMutex := sync.Mutex{} + reconfigure := func() { log.Infof("Reloading configuration from %s", *cfgPath) + srvMutex.Lock() + defer srvMutex.Unlock() + cfg, err := getConfig(cfgset, *cfgPath) if err != nil { log.Fatalf(err.Error()) @@ -124,6 +130,10 @@ func main() { shutdown := func() { log.Infof("Gracefully shutting down") + + srvMutex.Lock() + defer srvMutex.Unlock() + srv.Stop() srv.Purge() os.Exit(0) @@ -132,6 +142,9 @@ func main() { writeState := func() { log.Infof("Dumping server state") + srvMutex.Lock() + defer srvMutex.Unlock() + encoded, err := json.Marshal(srv) if err != nil { log.Errorf("Failed to dump server state: %v", err) From 997bc473df7d3cb5f16c420bcd7d99f4d3a33a0b Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Wed, 21 Jan 2015 10:30:55 -0800 Subject: [PATCH 03/10] server: introduce shutdown timeout - add all background server components to a WaitGroup - when shutting down the server, wait on this group or until a timeout (defaulting to one minute) before restarting or exiting. - if timeout occurs, shut down hard and let a - move Monitor into server package - Server.Monitor -> Server.Supervise to remove ambiguity/duplication --- {heart => server}/monitor.go | 19 ++++---- server/server.go | 89 ++++++++++++++++++++++++------------ 2 files changed, 70 insertions(+), 38 deletions(-) rename {heart => server}/monitor.go (75%) diff --git a/heart/monitor.go b/server/monitor.go similarity index 75% rename from heart/monitor.go rename to server/monitor.go index 44c44ea30..8f6bd612a 100644 --- a/heart/monitor.go +++ b/server/monitor.go @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package heart +package server import ( "errors" "time" + "github.com/coreos/fleet/heart" "github.com/coreos/fleet/log" ) @@ -32,16 +33,18 @@ type Monitor struct { ival time.Duration } -// Monitor ensures a Heart is still beating until a channel is closed, returning -// an error if the heartbeats fail. -func (m *Monitor) Monitor(hrt Heart, sdc <-chan bool) error { +// Monitor periodically checks the given Heart to make sure it +// beats successfully. If the heartbeat check fails for any +// reason, an error is returned. If the supplied channel is +// closed, Monitor returns ErrShutdown. +func (m *Monitor) Monitor(hrt heart.Heart, sdc <-chan bool) error { ticker := time.Tick(m.ival) for { select { case <-sdc: return ErrShutdown case <-ticker: - if _, err := m.check(hrt); err != nil { + if _, err := check(hrt, m.TTL); err != nil { return err } } @@ -50,10 +53,10 @@ func (m *Monitor) Monitor(hrt Heart, sdc <-chan bool) error { // check attempts to beat a Heart several times within a timeout, returning the // log index at which the beat succeeded or an error -func (m *Monitor) check(hrt Heart) (idx uint64, err error) { +func check(hrt heart.Heart, ttl time.Duration) (idx uint64, err error) { // time out after a third of the machine presence TTL, attempting // the heartbeat up to four times - timeout := m.TTL / 3 + timeout := ttl / 3 interval := timeout / 4 tchan := time.After(timeout) @@ -64,7 +67,7 @@ func (m *Monitor) check(hrt Heart) (idx uint64, err error) { err = errors.New("Monitor timed out before successful heartbeat") return case <-next: - idx, err = hrt.Beat(m.TTL) + idx, err = hrt.Beat(ttl) if err != nil { log.Debugf("Monitor heartbeat function returned err, retrying in %v: %v", interval, err) } diff --git a/server/server.go b/server/server.go index 004e377ac..0ebf8606b 100644 --- a/server/server.go +++ b/server/server.go @@ -17,8 +17,8 @@ package server import ( "encoding/json" "errors" - "fmt" "net/http" + "sync" "time" etcd "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/etcd/client" @@ -43,6 +43,8 @@ const ( // machineStateRefreshInterval is the amount of time the server will // wait before each attempt to refresh the local machine state machineStateRefreshInterval = time.Minute + + shutdownTimeout = time.Minute ) type Server struct { @@ -53,14 +55,15 @@ type Server struct { engine *engine.Engine mach *machine.CoreOSMachine hrt heart.Heart - mon *heart.Monitor + mon *Monitor api *api.Server disableEngine bool engineReconcileInterval time.Duration - killc chan bool // used to signal monitor to shutdown server - stopc chan bool // used to terminate all other goroutines + killc chan bool // used to signal monitor to shutdown server + stopc chan bool // used to terminate all other goroutines + wg sync.WaitGroup // used to co-ordinate shutdown } func New(cfg config.Config) (*Server, error) { @@ -118,7 +121,10 @@ func New(cfg config.Config) (*Server, error) { } hrt := heart.New(reg, mach) - mon := heart.NewMonitor(agentTTL) + mon := &Monitor{ + TTL: agentTTL, + ival: agentTTL / 2, + } apiServer := api.NewServer(listeners, api.NewServeMux(reg, cfg.TokenLimit)) apiServer.Serve() @@ -173,49 +179,72 @@ func (s *Server) Run() { time.Sleep(sleep) } - log.Infof("Starting server components") + go s.Supervise() + log.Infof("Starting server components") s.stopc = make(chan bool) - - go s.Monitor() - go s.api.Available(s.stopc) - go s.mach.PeriodicRefresh(machineStateRefreshInterval, s.stopc) - go s.agent.Heartbeat(s.stopc) - go s.aReconciler.Run(s.agent, s.stopc) + s.wg = sync.WaitGroup{} + beatc := make(chan *unit.UnitStateHeartbeat) + + components := []func(){ + func() { s.api.Available(s.stopc) }, + func() { s.mach.PeriodicRefresh(machineStateRefreshInterval, s.stopc) }, + func() { s.agent.Heartbeat(s.stopc) }, + func() { s.aReconciler.Run(s.agent, s.stopc) }, + func() { s.usGen.Run(beatc, s.stopc) }, + func() { s.usPub.Run(beatc, s.stopc) }, + } if s.disableEngine { log.Info("Not starting engine; disable-engine is set") } else { - go s.engine.Run(s.engineReconcileInterval, s.stopc) + components = append(components, func() { s.engine.Run(s.engineReconcileInterval, s.stopc) }) + } + for _, f := range components { + f := f + s.wg.Add(1) + go func() { + f() + s.wg.Done() + }() } - - beatchan := make(chan *unit.UnitStateHeartbeat) - go s.usGen.Run(beatchan, s.stopc) - go s.usPub.Run(beatchan, s.stopc) } -// Monitor tracks the health of the Server and coordinates its shutdown. If the -// Server is ever deemed unhealthy, the Server is restarted. -func (s *Server) Monitor() { +// Supervise monitors the life of the Server and coordinates its shutdown. +// A shutdown occurs when the monitor returns, either because a health check +// fails or a user triggers a shutdown. If the shutdown is due to a health +// check failure, the Server is restarted. Supervise will block shutdown until +// all components have finished shutting down or a timeout occurs; if this +// happens, the Server will not automatically be restarted. +func (s *Server) Supervise() { err := s.mon.Monitor(s.hrt, s.killc) - s.stop() switch { - case err == heart.ErrShutdown: + case err == ErrShutdown: log.Infof("Server monitor triggered: %v", err) + err = nil case err != nil: log.Errorf("Server monitor triggered: %v", err) - // Restart the server - s.Run() default: - panic(fmt.Errorf("unexpected err from Monitor: %v", err)) + panic("unexpected nil err from Monitor") } -} - -// stop is used by Monitor to terminate all other server goroutines -func (s *Server) stop() { close(s.stopc) + done := make(chan struct{}) + go func() { + s.wg.Done() + close(done) + }() + select { + case <-done: + case <-time.After(shutdownTimeout): + log.Errorf("Timed out waiting for server to shut down") + err = nil + } + if err != nil { + log.Infof("Restarting server") + s.Run() + } } -// Stop is used to gracefully terminate the server by triggering the Monitor +// Stop is used to gracefully terminate the server by triggering the Monitor to shut down func (s *Server) Stop() { close(s.killc) } From bcb9b9c30da8c0ff59359616583587459fce9d0f Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Wed, 21 Jan 2015 13:26:02 -0800 Subject: [PATCH 04/10] *: change kill/stop channels to struct{} Channels that are just used to "broadcast" messages (e.g. they are only ever closed) do not need a type; it is better to be more explicit about this by using a struct{}. Similarly, the channels can be receive-only. --- agent/agent.go | 4 ++-- agent/reconcile.go | 2 +- agent/unit_state.go | 2 +- agent/unit_state_test.go | 6 +++--- api/server.go | 2 +- engine/engine.go | 2 +- machine/coreos.go | 2 +- pkg/reconcile.go | 4 ++-- pkg/reconcile_test.go | 6 +++--- server/monitor.go | 2 +- server/server.go | 6 +++--- unit/generator.go | 2 +- 12 files changed, 20 insertions(+), 20 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index be793efeb..f22165669 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -57,11 +57,11 @@ func (a *Agent) MarshalJSON() ([]byte, error) { // Heartbeat updates the Registry periodically with an acknowledgement of the // Jobs this Agent is expected to be running. -func (a *Agent) Heartbeat(stop chan bool) { +func (a *Agent) Heartbeat(stop <-chan struct{}) { a.heartbeatJobs(a.ttl, stop) } -func (a *Agent) heartbeatJobs(ttl time.Duration, stop chan bool) { +func (a *Agent) heartbeatJobs(ttl time.Duration, stop <-chan struct{}) { heartbeat := func() { machID := a.Machine.State().ID launched := a.cache.launchedJobs() diff --git a/agent/reconcile.go b/agent/reconcile.go index ef092cbce..957adeeab 100644 --- a/agent/reconcile.go +++ b/agent/reconcile.go @@ -48,7 +48,7 @@ type AgentReconciler struct { // Run periodically attempts to reconcile the provided Agent until the stop // channel is closed. Run will also reconcile in reaction to events on the // AgentReconciler's rStream. -func (ar *AgentReconciler) Run(a *Agent, stop chan bool) { +func (ar *AgentReconciler) Run(a *Agent, stop <-chan struct{}) { reconcile := func() { start := time.Now() ar.Reconcile(a) diff --git a/agent/unit_state.go b/agent/unit_state.go index 2f765c517..88997140f 100644 --- a/agent/unit_state.go +++ b/agent/unit_state.go @@ -70,7 +70,7 @@ type UnitStatePublisher struct { // Run caches all of the heartbeat objects from the provided channel, publishing // them to the Registry every 5s. Heartbeat objects are also published as they // are received on the channel. -func (p *UnitStatePublisher) Run(beatchan <-chan *unit.UnitStateHeartbeat, stop chan bool) { +func (p *UnitStatePublisher) Run(beatchan <-chan *unit.UnitStateHeartbeat, stop <-chan struct{}) { go func() { for { select { diff --git a/agent/unit_state_test.go b/agent/unit_state_test.go index 3c58c4d48..cc765a181 100644 --- a/agent/unit_state_test.go +++ b/agent/unit_state_test.go @@ -320,7 +320,7 @@ func TestUnitStatePublisherRunTiming(t *testing.T) { } bc := make(chan *unit.UnitStateHeartbeat) - sc := make(chan bool) + sc := make(chan struct{}) go func() { usp.Run(bc, sc) }() @@ -466,7 +466,7 @@ func TestUnitStatePublisherRunQueuing(t *testing.T) { clock: clockwork.NewFakeClock(), } bc := make(chan *unit.UnitStateHeartbeat) - sc := make(chan bool) + sc := make(chan struct{}) go func() { usp.Run(bc, sc) }() @@ -599,7 +599,7 @@ func TestUnitStatePublisherRunWithDelays(t *testing.T) { } bc := make(chan *unit.UnitStateHeartbeat) - sc := make(chan bool) + sc := make(chan struct{}) wgs.Add(numPublishers) wgf.Add(numPublishers) diff --git a/api/server.go b/api/server.go index f2ca08d5f..7b6eb028d 100644 --- a/api/server.go +++ b/api/server.go @@ -57,7 +57,7 @@ func (s *Server) Serve() { // Available switches the Server's HTTP handler from a generic 503 Unavailable // response to the actual API. Once the provided channel is closed, the API is // torn back down and 503 responses are served. -func (s *Server) Available(stop chan bool) { +func (s *Server) Available(stop <-chan struct{}) { s.cur = s.api <-stop s.cur = unavailable diff --git a/engine/engine.go b/engine/engine.go index bce3ebe31..dd50f4be4 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -58,7 +58,7 @@ func New(reg *registry.EtcdRegistry, lManager lease.Manager, rStream pkg.EventSt } } -func (e *Engine) Run(ival time.Duration, stop chan bool) { +func (e *Engine) Run(ival time.Duration, stop <-chan struct{}) { leaseTTL := ival * 5 machID := e.machine.State().ID diff --git a/machine/coreos.go b/machine/coreos.go index 1564dc62b..4d9ed1046 100644 --- a/machine/coreos.go +++ b/machine/coreos.go @@ -84,7 +84,7 @@ func (m *CoreOSMachine) Refresh() { // PeriodicRefresh updates the current state of the CoreOSMachine at the // interval indicated. Operation ceases when the provided channel is closed. -func (m *CoreOSMachine) PeriodicRefresh(interval time.Duration, stop chan bool) { +func (m *CoreOSMachine) PeriodicRefresh(interval time.Duration, stop <-chan struct{}) { ticker := time.NewTicker(interval) for { select { diff --git a/pkg/reconcile.go b/pkg/reconcile.go index 4019a3906..2406768e8 100644 --- a/pkg/reconcile.go +++ b/pkg/reconcile.go @@ -32,7 +32,7 @@ type EventStream interface { } type PeriodicReconciler interface { - Run(stop chan bool) + Run(stop <-chan struct{}) } // NewPeriodicReconciler creates a PeriodicReconciler that will run recFunc at least every @@ -53,7 +53,7 @@ type reconciler struct { clock clockwork.Clock } -func (r *reconciler) Run(stop chan bool) { +func (r *reconciler) Run(stop <-chan struct{}) { trigger := make(chan struct{}) go func() { abort := make(chan struct{}) diff --git a/pkg/reconcile_test.go b/pkg/reconcile_test.go index 42f5c91e9..3c4807a9a 100644 --- a/pkg/reconcile_test.go +++ b/pkg/reconcile_test.go @@ -54,11 +54,11 @@ func TestPeriodicReconcilerRun(t *testing.T) { clock: fclock, } // launch the PeriodicReconciler in the background - prDone := make(chan bool) - stop := make(chan bool) + prDone := make(chan struct{}) + stop := make(chan struct{}) go func() { pr.Run(stop) - prDone <- true + close(prDone) }() // reconcile should have occurred once at start-up select { diff --git a/server/monitor.go b/server/monitor.go index 8f6bd612a..9514ec21f 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -37,7 +37,7 @@ type Monitor struct { // beats successfully. If the heartbeat check fails for any // reason, an error is returned. If the supplied channel is // closed, Monitor returns ErrShutdown. -func (m *Monitor) Monitor(hrt heart.Heart, sdc <-chan bool) error { +func (m *Monitor) Monitor(hrt heart.Heart, sdc <-chan struct{}) error { ticker := time.Tick(m.ival) for { select { diff --git a/server/server.go b/server/server.go index 0ebf8606b..a07adde36 100644 --- a/server/server.go +++ b/server/server.go @@ -61,8 +61,8 @@ type Server struct { engineReconcileInterval time.Duration - killc chan bool // used to signal monitor to shutdown server - stopc chan bool // used to terminate all other goroutines + killc chan struct{} // used to signal monitor to shutdown server + stopc chan struct{} // used to terminate all other goroutines wg sync.WaitGroup // used to co-ordinate shutdown } @@ -182,7 +182,7 @@ func (s *Server) Run() { go s.Supervise() log.Infof("Starting server components") - s.stopc = make(chan bool) + s.stopc = make(chan struct{}) s.wg = sync.WaitGroup{} beatc := make(chan *unit.UnitStateHeartbeat) diff --git a/unit/generator.go b/unit/generator.go index a2d28e76f..045c5c44c 100644 --- a/unit/generator.go +++ b/unit/generator.go @@ -53,7 +53,7 @@ func (g *UnitStateGenerator) MarshalJSON() ([]byte, error) { // Run periodically calls Generate and sends received *UnitStateHeartbeat // objects to the provided channel. -func (g *UnitStateGenerator) Run(receiver chan<- *UnitStateHeartbeat, stop chan bool) { +func (g *UnitStateGenerator) Run(receiver chan<- *UnitStateHeartbeat, stop <-chan struct{}) { tick := time.Tick(time.Second) for { select { From e44943956fb44ec2fbfc11d901c4104611b64edf Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Wed, 21 Jan 2015 14:56:38 -0800 Subject: [PATCH 05/10] server: wg.Done -> wg.Wait (doh) --- server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index a07adde36..eb76d016c 100644 --- a/server/server.go +++ b/server/server.go @@ -229,7 +229,7 @@ func (s *Server) Supervise() { close(s.stopc) done := make(chan struct{}) go func() { - s.wg.Done() + s.wg.Wait() close(done) }() select { From 59f172a8895d3689c367adf5072eedfa52ed208b Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Wed, 21 Jan 2015 15:02:28 -0800 Subject: [PATCH 06/10] server: change Monitor to return "shutdown" bool --- server/monitor.go | 10 ++++------ server/server.go | 16 ++++++---------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 9514ec21f..b1c3c1e48 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -22,8 +22,6 @@ import ( "github.com/coreos/fleet/log" ) -var ErrShutdown = errors.New("monitor told to shut down") - func NewMonitor(ttl time.Duration) *Monitor { return &Monitor{ttl, ttl / 2} } @@ -36,16 +34,16 @@ type Monitor struct { // Monitor periodically checks the given Heart to make sure it // beats successfully. If the heartbeat check fails for any // reason, an error is returned. If the supplied channel is -// closed, Monitor returns ErrShutdown. -func (m *Monitor) Monitor(hrt heart.Heart, sdc <-chan struct{}) error { +// closed, Monitor returns true and a nil error. +func (m *Monitor) Monitor(hrt heart.Heart, sdc <-chan struct{}) (bool, error) { ticker := time.Tick(m.ival) for { select { case <-sdc: - return ErrShutdown + return true, nil case <-ticker: if _, err := check(hrt, m.TTL); err != nil { - return err + return false, err } } } diff --git a/server/server.go b/server/server.go index eb76d016c..eddda5972 100644 --- a/server/server.go +++ b/server/server.go @@ -216,15 +216,11 @@ func (s *Server) Run() { // all components have finished shutting down or a timeout occurs; if this // happens, the Server will not automatically be restarted. func (s *Server) Supervise() { - err := s.mon.Monitor(s.hrt, s.killc) - switch { - case err == ErrShutdown: - log.Infof("Server monitor triggered: %v", err) - err = nil - case err != nil: + sd, err := s.mon.Monitor(s.hrt, s.killc) + if sd { + log.Infof("Server monitor triggered: told to shut down") + } else { log.Errorf("Server monitor triggered: %v", err) - default: - panic("unexpected nil err from Monitor") } close(s.stopc) done := make(chan struct{}) @@ -236,9 +232,9 @@ func (s *Server) Supervise() { case <-done: case <-time.After(shutdownTimeout): log.Errorf("Timed out waiting for server to shut down") - err = nil + sd = true } - if err != nil { + if !sd { log.Infof("Restarting server") s.Run() } From db39286d8bcc49268078098f74e1036b3113e66c Mon Sep 17 00:00:00 2001 From: Jonathan Boulle Date: Wed, 21 Jan 2015 15:22:29 -0800 Subject: [PATCH 07/10] server: Stop -> Kill To make things a little clearer for ol' man Crawford, rename the "Stop" function to "Kill" to align better with the channel names and be a little more explicit that it is invoked in response to a kill signal. --- fleetd/fleetd.go | 4 ++-- server/server.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/fleetd/fleetd.go b/fleetd/fleetd.go index 5518d79ac..ff972d61a 100755 --- a/fleetd/fleetd.go +++ b/fleetd/fleetd.go @@ -119,7 +119,7 @@ func main() { } log.Infof("Restarting server components") - srv.Stop() + srv.Kill() srv, err = server.New(*cfg) if err != nil { @@ -134,7 +134,7 @@ func main() { srvMutex.Lock() defer srvMutex.Unlock() - srv.Stop() + srv.Kill() srv.Purge() os.Exit(0) } diff --git a/server/server.go b/server/server.go index eddda5972..96763f100 100644 --- a/server/server.go +++ b/server/server.go @@ -240,8 +240,8 @@ func (s *Server) Supervise() { } } -// Stop is used to gracefully terminate the server by triggering the Monitor to shut down -func (s *Server) Stop() { +// Kill is used to gracefully terminate the server by triggering the Monitor to shut down +func (s *Server) Kill() { close(s.killc) } From b358ee4d5bb905e0934265c19dcc242ef7244e3b Mon Sep 17 00:00:00 2001 From: Olaf Buddenhagen Date: Tue, 1 Mar 2016 20:15:18 +0100 Subject: [PATCH 08/10] Add testcase for panic on shutdown Before the monitor/shutdown handling rework, fleetd would panic with a channel double-close if it was shut down while a restart (triggered by the monitor on a failed health check) was is progress. Added a functional test for this situation -- and also for regular shutdown as a baseline. --- functional/shutdown_test.go | 102 ++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 functional/shutdown_test.go diff --git a/functional/shutdown_test.go b/functional/shutdown_test.go new file mode 100644 index 000000000..760c29ebf --- /dev/null +++ b/functional/shutdown_test.go @@ -0,0 +1,102 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package functional + +import ( + "strings" + "testing" + + "github.com/coreos/fleet/functional/platform" +) + +// Check clean shutdown of fleetd under normal circumstances +func TestShutdown(t *testing.T) { + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + t.Fatal(err) + } + defer cluster.Destroy() + + m0, err := cluster.CreateMember() + if err != nil { + t.Fatal(err) + } + _, err = cluster.WaitForNMachines(m0, 1) + if err != nil { + t.Fatal(err) + } + + // Stop the fleet process. + if _, err = cluster.MemberCommand(m0, "sudo", "systemctl", "stop", "fleet"); err != nil { + t.Fatal(err) + } + + // Check expected state after stop. + stdout, _ := cluster.MemberCommand(m0, "sudo", "systemctl", "show", "--property=ActiveState", "fleet") + if strings.TrimSpace(stdout) != "ActiveState=inactive" { + t.Fatalf("Fleet unit not reported as inactive: %s", stdout) + } + stdout, _ = cluster.MemberCommand(m0, "sudo", "systemctl", "show", "--property=Result", "fleet") + if strings.TrimSpace(stdout) != "Result=success" { + t.Fatalf("Result for fleet unit not reported as success: %s", stdout) + } +} + +// Check clean shutdown of fleetd while automatic restart (after failed health check) is in progress +func TestShutdownVsMonitor(t *testing.T) { + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + t.Fatal(err) + } + defer cluster.Destroy() + + m0, err := cluster.CreateMember() + if err != nil { + t.Fatal(err) + } + _, err = cluster.WaitForNMachines(m0, 1) + if err != nil { + t.Fatal(err) + } + + // Cut connection to etcd. + // + // This will result in a failed health check, and consequently the monitor will attempt a restart. + if _, err = cluster.MemberCommand(m0, "sudo", "iptables", "-I", "OUTPUT", "-p", "tcp", "-m", "multiport", "--dports=2379,4001", "-j", "DROP"); err != nil { + t.Fatal(err) + } + + // Wait for the monitor to trigger the restart. + // + // This will never complete, as long as there is no connectivity. + if _, err = cluster.MemberCommand(m0, "sudo", "sh", "-c", `'until journalctl -u fleet | grep -q "Server monitor triggered: Monitor timed out before successful heartbeat"; do sleep 1; done'`); err != nil { + t.Fatal(err) + } + + // Stop fleetd while the restart is still in progress. + if _, err = cluster.MemberCommand(m0, "sudo", "systemctl", "stop", "fleet"); err != nil { + t.Fatal(err) + } + + // Verify that fleetd was shut down cleanly in spite of the concurrent restart. + stdout, _ := cluster.MemberCommand(m0, "sudo", "systemctl", "show", "--property=ActiveState", "fleet") + if strings.TrimSpace(stdout) != "ActiveState=inactive" { + t.Fatalf("Fleet unit not reported as inactive: %s", stdout) + } + stdout, _ = cluster.MemberCommand(m0, "sudo", "systemctl", "show", "--property=Result", "fleet") + if strings.TrimSpace(stdout) != "Result=success" { + t.Fatalf("Result for fleet unit not reported as success: %s", stdout) + } +} From b43353d6125fc2957d18e146cff63748293641c2 Mon Sep 17 00:00:00 2001 From: Olaf Buddenhagen Date: Wed, 2 Mar 2016 16:49:49 +0100 Subject: [PATCH 09/10] Functional tests: Reduce fleet agent timeout The default agent_ttl setting of 30s makes TestMonitorVsShutdown run pretty long. (And probably also other tests related to connectivity loss, that will be added in the future...) With no actual network delays involved, making the timeout much shorter shouldn't cause any trouble -- so we can simply set it globally, rather than trying to override it only for the specific test in question. --- functional/util/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/functional/util/config.go b/functional/util/config.go index a2f36dc8d..1c29ab2dc 100644 --- a/functional/util/config.go +++ b/functional/util/config.go @@ -35,6 +35,7 @@ write_files: etcd_servers=[{{printf "%q" .EtcdEndpoint}}] etcd_key_prefix={{.EtcdKeyPrefix}} public_ip={{.IP}} + agent_ttl=3s ssh_authorized_keys: - {{printf "%q" .PublicKey}} From 40691b533be51319c7578241e668b3e774bd70df Mon Sep 17 00:00:00 2001 From: Olaf Buddenhagen Date: Wed, 2 Mar 2016 13:25:30 +0100 Subject: [PATCH 10/10] Cleanup: Restore usage of NewMonitor() instead of inlining the code The function moved to a different package, but otherwise can be used as before -- I presume inlining it was some kind of accident... --- server/server.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/server.go b/server/server.go index 96763f100..9a947e068 100644 --- a/server/server.go +++ b/server/server.go @@ -121,10 +121,7 @@ func New(cfg config.Config) (*Server, error) { } hrt := heart.New(reg, mach) - mon := &Monitor{ - TTL: agentTTL, - ival: agentTTL / 2, - } + mon := NewMonitor(agentTTL) apiServer := api.NewServer(listeners, api.NewServeMux(reg, cfg.TokenLimit)) apiServer.Serve()