Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move shutdownfns, terminatefns and hammerfns out of separate goroutines #15686

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions modules/graceful/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,9 @@ package graceful

import (
"context"
"fmt"
"time"
)

// Errors for context.Err()
var (
ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown")
ErrHammer = fmt.Errorf("Graceful Manager called Hammer")
ErrTerminate = fmt.Errorf("Graceful Manager called Terminate")
)

// ChannelContext is a context that wraps a channel and error as a context
type ChannelContext struct {
done <-chan struct{}
Expand Down Expand Up @@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} {
// Callers using this context should ensure that they are registered as a running server
// in order that they are waited for.
func (g *Manager) ShutdownContext() context.Context {
return &ChannelContext{
done: g.IsShutdown(),
err: ErrShutdown,
}
return g.shutdown
}

// HammerContext returns a context.Context that is Done at hammer
// Callers using this context should ensure that they are registered as a running server
// in order that they are waited for.
func (g *Manager) HammerContext() context.Context {
return &ChannelContext{
done: g.IsHammer(),
err: ErrHammer,
}
return g.hammer
}

// TerminateContext returns a context.Context that is Done at terminate
// Callers using this context should ensure that they are registered as a terminating server
// in order that they are waited for.
func (g *Manager) TerminateContext() context.Context {
return &ChannelContext{
done: g.IsTerminate(),
err: ErrTerminate,
}
return g.terminate
}
147 changes: 86 additions & 61 deletions modules/graceful/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,23 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
}
}()
run(func(ctx context.Context, atShutdown func()) {
go func() {
select {
case <-g.IsShutdown():
atShutdown()
case <-ctx.Done():
return
}
}()
g.lock.Lock()
defer g.lock.Unlock()
g.toRunAtShutdown = append(g.toRunAtShutdown,
func() {
defer func() {
if err := recover(); err != nil {
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
g.doShutdown()
}
}()
select {
case <-ctx.Done():
return
default:
atShutdown()
}
})
}, func(ctx context.Context, atTerminate func()) {
g.RunAtTerminate(ctx, atTerminate)
})
Expand Down Expand Up @@ -138,58 +147,73 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) {
// RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination
func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) {
g.terminateWaitGroup.Add(1)
go func() {
defer g.terminateWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
g.lock.Lock()
defer g.lock.Unlock()
g.toRunAtTerminate = append(g.toRunAtTerminate,
func() {
defer g.terminateWaitGroup.Done()
defer func() {
if err := recover(); err != nil {
log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2))
}
}()
select {
case <-ctx.Done():
return
default:
terminate()
}
}()
select {
case <-g.IsTerminate():
terminate()
case <-ctx.Done():
}
}()
})
}

// RunAtShutdown creates a go-routine to run the provided function at shutdown
func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
go func() {
defer func() {
if err := recover(); err != nil {
log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
g.lock.Lock()
defer g.lock.Unlock()
g.toRunAtShutdown = append(g.toRunAtShutdown,
func() {
defer func() {
if err := recover(); err != nil {
log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2))
}
}()
select {
case <-ctx.Done():
return
default:
shutdown()
}
}()
select {
case <-g.IsShutdown():
shutdown()
case <-ctx.Done():
}
}()
})
}

// RunAtHammer creates a go-routine to run the provided function at shutdown
func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) {
go func() {
defer func() {
if err := recover(); err != nil {
log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
g.lock.Lock()
defer g.lock.Unlock()
g.toRunAtHammer = append(g.toRunAtHammer,
func() {
defer func() {
if err := recover(); err != nil {
log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
}
}()
select {
case <-ctx.Done():
return
default:
hammer()
}
}()
select {
case <-g.IsHammer():
hammer()
case <-ctx.Done():
}
}()
})
}
func (g *Manager) doShutdown() {
if !g.setStateTransition(stateRunning, stateShuttingDown) {
return
}
g.lock.Lock()
close(g.shutdown)
g.shutdownCancel()
for _, fn := range g.toRunAtShutdown {
go fn()
}
g.lock.Unlock()

if setting.GracefulHammerTime >= 0 {
Expand All @@ -203,7 +227,7 @@ func (g *Manager) doShutdown() {
g.doTerminate()
g.WaitForTerminate()
g.lock.Lock()
close(g.done)
g.doneCancel()
g.lock.Unlock()
}()
}
Expand All @@ -212,10 +236,13 @@ func (g *Manager) doHammerTime(d time.Duration) {
time.Sleep(d)
g.lock.Lock()
select {
case <-g.hammer:
case <-g.hammer.Done():
default:
log.Warn("Setting Hammer condition")
close(g.hammer)
g.hammerCancel()
for _, fn := range g.toRunAtHammer {
go fn()
}
}
g.lock.Unlock()
}
Expand All @@ -226,10 +253,13 @@ func (g *Manager) doTerminate() {
}
g.lock.Lock()
select {
case <-g.terminate:
case <-g.terminate.Done():
default:
log.Warn("Terminating")
close(g.terminate)
g.terminateCancel()
for _, fn := range g.toRunAtTerminate {
go fn()
}
}
g.lock.Unlock()
}
Expand All @@ -242,22 +272,22 @@ func (g *Manager) IsChild() bool {
// IsShutdown returns a channel which will be closed at shutdown.
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
func (g *Manager) IsShutdown() <-chan struct{} {
return g.shutdown
return g.shutdown.Done()
}

// IsHammer returns a channel which will be closed at hammer
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
// Servers running within the running server wait group should respond to IsHammer
// if not shutdown already
func (g *Manager) IsHammer() <-chan struct{} {
return g.hammer
return g.hammer.Done()
}

// IsTerminate returns a channel which will be closed at terminate
// The order of closure is IsShutdown, IsHammer (potentially), IsTerminate
// IsTerminate will only close once all running servers have stopped
func (g *Manager) IsTerminate() <-chan struct{} {
return g.terminate
return g.terminate.Done()
}

// ServerDone declares a running server done and subtracts one from the
Expand Down Expand Up @@ -314,25 +344,20 @@ func (g *Manager) InformCleanup() {

// Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating
func (g *Manager) Done() <-chan struct{} {
return g.done
return g.done.Done()
}

// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate
// Err allows the manager to be viewed as a context.Context done at Terminate
func (g *Manager) Err() error {
select {
case <-g.Done():
return ErrTerminate
default:
return nil
}
return g.done.Err()
}

// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values
func (g *Manager) Value(key interface{}) interface{} {
return nil
return g.done.Value(key)
}

// Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context
func (g *Manager) Deadline() (deadline time.Time, ok bool) {
return
return g.done.Deadline()
}
26 changes: 17 additions & 9 deletions modules/graceful/manager_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,21 @@ type Manager struct {
forked bool
lock *sync.RWMutex
state state
shutdown chan struct{}
hammer chan struct{}
terminate chan struct{}
done chan struct{}
shutdown context.Context
hammer context.Context
terminate context.Context
done context.Context
shutdownCancel context.CancelFunc
hammerCancel context.CancelFunc
terminateCancel context.CancelFunc
doneCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup

toRunAtShutdown []func()
toRunAtHammer []func()
toRunAtTerminate []func()
}

func newGracefulManager(ctx context.Context) *Manager {
Expand All @@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager {
}

func (g *Manager) start(ctx context.Context) {
// Make channels
g.terminate = make(chan struct{})
g.shutdown = make(chan struct{})
g.hammer = make(chan struct{})
g.done = make(chan struct{})
// Make contexts
g.terminate, g.terminateCancel = context.WithCancel(ctx)
g.shutdown, g.shutdownCancel = context.WithCancel(ctx)
g.hammer, g.hammerCancel = context.WithCancel(ctx)
g.done, g.doneCancel = context.WithCancel(ctx)

// Set the running state & handle signals
g.setState(stateRunning)
Expand Down
28 changes: 19 additions & 9 deletions modules/graceful/manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ type Manager struct {
isChild bool
lock *sync.RWMutex
state state
shutdown chan struct{}
hammer chan struct{}
terminate chan struct{}
done chan struct{}
shutdown context.Context
hammer context.Context
terminate context.Context
done context.Context
shutdownCancel context.CancelFunc
hammerCancel context.CancelFunc
terminateCancel context.CancelFunc
doneCancel context.CancelFunc
runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup
shutdownRequested chan struct{}

toRunAtShutdown []func()
toRunAtHammer []func()
toRunAtTerminate []func()
}

func newGracefulManager(ctx context.Context) *Manager {
Expand All @@ -58,11 +66,13 @@ func newGracefulManager(ctx context.Context) *Manager {
}

func (g *Manager) start() {
// Make contexts
g.terminate, g.terminateCancel = context.WithCancel(g.ctx)
g.shutdown, g.shutdownCancel = context.WithCancel(g.ctx)
g.hammer, g.hammerCancel = context.WithCancel(g.ctx)
g.done, g.doneCancel = context.WithCancel(g.ctx)

// Make channels
zeripath marked this conversation as resolved.
Show resolved Hide resolved
g.terminate = make(chan struct{})
g.shutdown = make(chan struct{})
g.hammer = make(chan struct{})
g.done = make(chan struct{})
g.shutdownRequested = make(chan struct{})

// Set the running state
Expand Down Expand Up @@ -169,7 +179,7 @@ hammerLoop:
default:
log.Debug("Unexpected control request: %v", change.Cmd)
}
case <-g.hammer:
case <-g.hammer.Done():
break hammerLoop
}
}
Expand Down