Skip to content

Commit

Permalink
[Elastic Agent] Properly stop subprocess when receiving SIGTERM (elas…
Browse files Browse the repository at this point in the history
…tic#19567) (elastic#19683)

* Implement proper shutdown so spawned subprocesses are stopped correctly when Elastic Agent is signalled to stop.

* Swap shutdown order for fleet mode.

* Reorder stop in local_mode. Add to changelog.

(cherry picked from commit a820842)
  • Loading branch information
blakerouse authored Jul 7, 2020
1 parent 2f2ee67 commit 2be60b2
Show file tree
Hide file tree
Showing 14 changed files with 113 additions and 4 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
- Correctly report platform and family. {issue}18665[18665]
- Guard against empty stream.datasource and namespace {pull}18769[18769]
- Fix install service script for windows {pull}18814[18814]
- Properly stops subprocess on shutdown {pull}19567[19567]

==== New features

Expand Down
9 changes: 7 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (

type emitterFunc func(*config.Config) error

// ConfigHandler is capable of handling config and perform actions at it.
// ConfigHandler is capable of handling config, perform actions at it, shutdown any long running process.
type ConfigHandler interface {
HandleConfig(configrequest.Request) error
Shutdown()
}

type discoverFunc func() ([]string, error)
Expand All @@ -39,6 +40,7 @@ type Local struct {
bgContext context.Context
cancelCtxFn context.CancelFunc
log *logger.Logger
router *router
source source
agentInfo *info.AgentInfo
srv *server.Server
Expand Down Expand Up @@ -97,6 +99,7 @@ func newLocal(
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
localApplication.router = router

discover := discoverer(pathConfigFile, c.Management.Path)
emit := emitter(
Expand Down Expand Up @@ -140,9 +143,11 @@ func (l *Local) Start() error {

// Stop stops a local agent.
func (l *Local) Stop() error {
err := l.source.Stop()
l.cancelCtxFn()
l.router.Shutdown()
l.srv.Stop()
return l.source.Stop()
return err
}

// AgentInfo retrieves agent information.
Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Managed struct {
api apiClient
agentInfo *info.AgentInfo
gateway *fleetGateway
router *router
srv *server.Server
}

Expand Down Expand Up @@ -144,6 +145,7 @@ func newManaged(
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
managedApplication.router = router

emit := emitter(
log,
Expand Down Expand Up @@ -225,6 +227,7 @@ func (m *Managed) Start() error {
func (m *Managed) Stop() error {
defer m.log.Info("Agent is stopped")
m.cancelCtxFn()
m.router.Shutdown()
m.srv.Stop()
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (m *mockStreamStore) Close() error {
return nil
}

func (m *mockStreamStore) Shutdown() {}

const fleetResponse = `
{
"action": "checkin",
Expand Down
14 changes: 14 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type routingKey = string
type stream interface {
Execute(*configRequest) error
Close() error
Shutdown()
}

type streamFunc func(*logger.Logger, routingKey) (stream, error)
Expand Down Expand Up @@ -112,3 +113,16 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e

return nil
}

// Shutdown shutdowns the router because Agent is stopping.
func (r *router) Shutdown() {
keys := r.routes.Keys()
for _, k := range keys {
p, ok := r.routes.Get(k)
if !ok {
continue
}
p.(stream).Shutdown()
r.routes.Remove(k)
}
}
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (m *mockStream) Close() error {
return nil
}

func (m *mockStream) Shutdown() {}

func (m *mockStream) event(op rOp, args ...interface{}) {
m.notify(m.rk, op, args...)
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func (b *operatorStream) Shutdown() {
b.configHandler.Shutdown()
}

func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (*testMonitorableApp) Started() bool { return false }
func (*testMonitorableApp) Start(_ context.Context, _ app.Taggable, cfg map[string]interface{}) error {
return nil
}
func (*testMonitorableApp) Stop() {}
func (*testMonitorableApp) Stop() {}
func (*testMonitorableApp) Shutdown() {}
func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Application interface {
Started() bool
Start(ctx context.Context, p app.Taggable, cfg map[string]interface{}) error
Stop()
Shutdown()
Configure(ctx context.Context, config map[string]interface{}) error
Monitor() monitoring.Monitor
State() state.State
Expand Down
7 changes: 7 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error {
return nil
}

// Shutdown handles shutting down the running apps for Agent shutdown.
func (o *Operator) Shutdown() {
for _, app := range o.apps {
app.Shutdown()
}
}

// Start starts a new process based on a configuration
// specific configuration of new process is passed
func (o *Operator) start(p Descriptor, cfg map[string]interface{}) (err error) {
Expand Down
6 changes: 6 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (a *Application) Stop() {
a.setState(state.Stopped, "Stopped")
}

// Shutdown stops the application (aka. subprocess).
func (a *Application) Shutdown() {
a.logger.Infof("Signaling application to stop because of shutdown: %s", a.id)
a.Stop()
}

// SetState sets the status of the application.
func (a *Application) SetState(status state.Status, msg string) {
a.appLock.Lock()
Expand Down
19 changes: 19 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,25 @@ func (a *Application) Stop() {
a.stopCredsListener()
}

// Shutdown disconnects the service, but doesn't signal it to stop.
func (a *Application) Shutdown() {
a.appLock.Lock()
defer a.appLock.Unlock()

if a.srvState == nil {
return
}

// destroy the application in the server, this skips sending
// the expected stopping state to the service
a.setState(state.Stopped, "Stopped")
a.srvState.Destroy()
a.srvState = nil

a.cleanUp()
a.stopCredsListener()
}

// OnStatusChange is the handler called by the GRPC server code.
//
// It updates the status of the application and handles restarting the application is needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux darwin
// +build darwin

package process

Expand Down
44 changes: 44 additions & 0 deletions x-pack/elastic-agent/pkg/core/process/cmd_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux

package process

import (
"math"
"os"
"os/exec"
"path/filepath"
"syscall"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func getCmd(logger *logger.Logger, path string, env []string, uid, gid int, arg ...string) *exec.Cmd {
cmd := exec.Command(path, arg...)
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)
cmd.Dir = filepath.Dir(path)
if isInt32(uid) && isInt32(gid) {
cmd.SysProcAttr = &syscall.SysProcAttr{
// on shutdown all sub-processes are sent SIGTERM, in the case that the Agent dies or is -9 killed
// then also kill the children (only supported on linux)
Pdeathsig: syscall.SIGKILL,
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
NoSetGroups: true,
},
}
} else {
logger.Errorf("provided uid or gid for %s is invalid. uid: '%d' gid: '%d'.", path, uid, gid)
}

return cmd
}

func isInt32(val int) bool {
return val >= 0 && val <= math.MaxInt32
}

0 comments on commit 2be60b2

Please sign in to comment.