diff --git a/modules/graceful/context.go b/modules/graceful/context.go index 1ad1109b4e5bd..8cebd407a85f4 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -6,17 +6,9 @@ package graceful import ( "context" - "fmt" "time" ) -// Errors for context.Err() -var ( - ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown") - ErrHammer = fmt.Errorf("Graceful Manager called Hammer") - ErrTerminate = fmt.Errorf("Graceful Manager called Terminate") -) - // ChannelContext is a context that wraps a channel and error as a context type ChannelContext struct { done <-chan struct{} @@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) ShutdownContext() context.Context { - return &ChannelContext{ - done: g.IsShutdown(), - err: ErrShutdown, - } + return g.shutdown } // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) HammerContext() context.Context { - return &ChannelContext{ - done: g.IsHammer(), - err: ErrHammer, - } + return g.hammer } // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. func (g *Manager) TerminateContext() context.Context { - return &ChannelContext{ - done: g.IsTerminate(), - err: ErrTerminate, - } + return g.terminate } diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 903d05ed21f41..a72facdf868b6 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -81,14 +81,23 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { } }() run(func(ctx context.Context, atShutdown func()) { - go func() { - select { - case <-g.IsShutdown(): - atShutdown() - case <-ctx.Done(): - return - } - }() + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2)) + g.doShutdown() + } + }() + select { + case <-ctx.Done(): + return + default: + atShutdown() + } + }) }, func(ctx context.Context, atTerminate func()) { g.RunAtTerminate(ctx, atTerminate) }) @@ -138,58 +147,73 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) { // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { g.terminateWaitGroup.Add(1) - go func() { - defer g.terminateWaitGroup.Done() - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtTerminate = append(g.toRunAtTerminate, + func() { + defer g.terminateWaitGroup.Done() + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + terminate() } - }() - select { - case <-g.IsTerminate(): - terminate() - case <-ctx.Done(): - } - }() + }) } // RunAtShutdown creates a go-routine to run the provided function at shutdown func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + shutdown() } - }() - select { - case <-g.IsShutdown(): - shutdown() - case <-ctx.Done(): - } - }() + }) } // RunAtHammer creates a go-routine to run the provided function at shutdown func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtHammer = append(g.toRunAtHammer, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + hammer() } - }() - select { - case <-g.IsHammer(): - hammer() - case <-ctx.Done(): - } - }() + }) } func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } g.lock.Lock() - close(g.shutdown) + g.shutdownCancel() + for _, fn := range g.toRunAtShutdown { + go fn() + } g.lock.Unlock() if setting.GracefulHammerTime >= 0 { @@ -203,7 +227,7 @@ func (g *Manager) doShutdown() { g.doTerminate() g.WaitForTerminate() g.lock.Lock() - close(g.done) + g.doneCancel() g.lock.Unlock() }() } @@ -212,10 +236,13 @@ func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) g.lock.Lock() select { - case <-g.hammer: + case <-g.hammer.Done(): default: log.Warn("Setting Hammer condition") - close(g.hammer) + g.hammerCancel() + for _, fn := range g.toRunAtHammer { + go fn() + } } g.lock.Unlock() } @@ -226,10 +253,13 @@ func (g *Manager) doTerminate() { } g.lock.Lock() select { - case <-g.terminate: + case <-g.terminate.Done(): default: log.Warn("Terminating") - close(g.terminate) + g.terminateCancel() + for _, fn := range g.toRunAtTerminate { + go fn() + } } g.lock.Unlock() } @@ -242,7 +272,7 @@ func (g *Manager) IsChild() bool { // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate func (g *Manager) IsShutdown() <-chan struct{} { - return g.shutdown + return g.shutdown.Done() } // IsHammer returns a channel which will be closed at hammer @@ -250,14 +280,14 @@ func (g *Manager) IsShutdown() <-chan struct{} { // Servers running within the running server wait group should respond to IsHammer // if not shutdown already func (g *Manager) IsHammer() <-chan struct{} { - return g.hammer + return g.hammer.Done() } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped func (g *Manager) IsTerminate() <-chan struct{} { - return g.terminate + return g.terminate.Done() } // ServerDone declares a running server done and subtracts one from the @@ -314,25 +344,20 @@ func (g *Manager) InformCleanup() { // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating func (g *Manager) Done() <-chan struct{} { - return g.done + return g.done.Done() } -// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +// Err allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Err() error { - select { - case <-g.Done(): - return ErrTerminate - default: - return nil - } + return g.done.Err() } // Value allows the manager to be viewed as a context.Context done at Terminate, it has no values func (g *Manager) Value(key interface{}) interface{} { - return nil + return g.done.Value(key) } // Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context func (g *Manager) Deadline() (deadline time.Time, ok bool) { - return + return g.done.Deadline() } diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 540974454c34c..60d4f54280de7 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -25,13 +25,21 @@ type Manager struct { forked bool lock *sync.RWMutex state state - shutdown chan struct{} - hammer chan struct{} - terminate chan struct{} - done chan struct{} + shutdown context.Context + hammer context.Context + terminate context.Context + done context.Context + shutdownCancel context.CancelFunc + hammerCancel context.CancelFunc + terminateCancel context.CancelFunc + doneCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup + + toRunAtShutdown []func() + toRunAtHammer []func() + toRunAtTerminate []func() } func newGracefulManager(ctx context.Context) *Manager { @@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start(ctx context.Context) { - // Make channels - g.terminate = make(chan struct{}) - g.shutdown = make(chan struct{}) - g.hammer = make(chan struct{}) - g.done = make(chan struct{}) + // Make contexts + g.terminate, g.terminateCancel = context.WithCancel(ctx) + g.shutdown, g.shutdownCancel = context.WithCancel(ctx) + g.hammer, g.hammerCancel = context.WithCancel(ctx) + g.done, g.doneCancel = context.WithCancel(ctx) // Set the running state & handle signals g.setState(stateRunning) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index b0e0d1ce38e30..01ada4d85cc49 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -36,14 +36,22 @@ type Manager struct { isChild bool lock *sync.RWMutex state state - shutdown chan struct{} - hammer chan struct{} - terminate chan struct{} - done chan struct{} + shutdown context.Context + hammer context.Context + terminate context.Context + done context.Context + shutdownCancel context.CancelFunc + hammerCancel context.CancelFunc + terminateCancel context.CancelFunc + doneCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup shutdownRequested chan struct{} + + toRunAtShutdown []func() + toRunAtHammer []func() + toRunAtTerminate []func() } func newGracefulManager(ctx context.Context) *Manager { @@ -58,11 +66,13 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start() { + // Make contexts + g.terminate, g.terminateCancel = context.WithCancel(g.ctx) + g.shutdown, g.shutdownCancel = context.WithCancel(g.ctx) + g.hammer, g.hammerCancel = context.WithCancel(g.ctx) + g.done, g.doneCancel = context.WithCancel(g.ctx) + // Make channels - g.terminate = make(chan struct{}) - g.shutdown = make(chan struct{}) - g.hammer = make(chan struct{}) - g.done = make(chan struct{}) g.shutdownRequested = make(chan struct{}) // Set the running state @@ -169,7 +179,7 @@ hammerLoop: default: log.Debug("Unexpected control request: %v", change.Cmd) } - case <-g.hammer: + case <-g.hammer.Done(): break hammerLoop } }