Skip to content

Commit

Permalink
[Elastic Agent] Add ability to re-exec agent (#20111)
Browse files Browse the repository at this point in the history
* Add ability to reexec on all platforms.

* Get it working on Windows

* Fix on mac.

* Fix vet.

* Update ShutdownChan to be receive only.
  • Loading branch information
blakerouse authored Jul 28, 2020
1 parent 9f99091 commit 5a37193
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 10 deletions.
5 changes: 5 additions & 0 deletions libbeat/logp/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ func (l *Logger) Recover(msg string) {
}
}

// Sync syncs the logger.
func (l *Logger) Sync() error {
return l.logger.Sync()
}

// L returns an unnamed global logger.
func L() *Logger {
return loadLogger().logger
Expand Down
80 changes: 80 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/reexec/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package reexec

import (
"sync"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

var (
execSingleton ExecManager
execSingletonOnce sync.Once
)

// ExecManager is the interface that the global reexec manager implements.
type ExecManager interface {
// ReExec asynchronously re-executes command in the same PID and memory address
// as the currently running application.
ReExec()

// ShutdownChan returns the shutdown channel the main function should use to
// handle shutdown of the current running application.
ShutdownChan() <-chan bool

// ShutdownComplete gets called from the main function once ShutdownChan channel
// has been closed and the running application has completely shutdown.
ShutdownComplete()
}

// Manager returns the global reexec manager.
func Manager(log *logger.Logger, exec string) ExecManager {
execSingletonOnce.Do(func() {
execSingleton = newManager(log, exec)
})
return execSingleton
}

type manager struct {
logger *logger.Logger
exec string
trigger chan bool
shutdown chan bool
complete chan bool
}

func newManager(log *logger.Logger, exec string) *manager {
return &manager{
logger: log,
exec: exec,
trigger: make(chan bool),
shutdown: make(chan bool),
complete: make(chan bool),
}
}

func (m *manager) ReExec() {
go func() {
close(m.trigger)
<-m.shutdown

if err := reexec(m.logger, m.exec); err != nil {
// panic; because there is no going back, everything is shutdown
panic(err)
}

close(m.complete)
}()
}

func (m *manager) ShutdownChan() <-chan bool {
return m.trigger
}

func (m *manager) ShutdownComplete() {
close(m.shutdown)
<-m.complete
}
25 changes: 25 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/reexec/reexec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package reexec

import (
"os"
"path/filepath"

"golang.org/x/sys/unix"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func reexec(log *logger.Logger, executable string) error {
// force log sync, before re-exec
_ = log.Sync()

args := []string{filepath.Base(executable)}
args = append(args, os.Args[1:]...)
return unix.Exec(executable, args, os.Environ())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build windows

package reexec

import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"

"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/mgr"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

// exec performs execution on Windows.
//
// Windows does not support the ability to execute over the same PID and memory. Depending on the execution context
// different scenarios need to occur.
//
// * Services.msc - A new child process is spawned that waits for the service to stop, then restarts it and the
// current process just exits.
//
// * Sub-process - As a sub-process a new child is spawned and the current process just exits.
func reexec(log *logger.Logger, executable string) error {
svc, status, err := getService()
if err == nil {
// running as a service; spawn re-exec windows sub-process
log.Infof("Running as Windows service %s; triggering service restart", svc.Name)
args := []string{filepath.Base(executable), "reexec_windows", svc.Name, strconv.Itoa(int(status.ProcessId))}
cmd := exec.Cmd{
Path: executable,
Args: args,
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
if err := cmd.Start(); err != nil {
return err
}
} else {
log.Debugf("Discovering Windows service result: %s", err)

// running as a sub-process of another process; just execute as a child
log.Infof("Running as Windows process; spawning new child process")
args := []string{filepath.Base(executable)}
args = append(args, os.Args[1:]...)
cmd := exec.Cmd{
Path: executable,
Args: args,
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: os.Stderr,
}
if err := cmd.Start(); err != nil {
return err
}
}
// force log sync before exit
_ = log.Sync()
return nil
}

func getService() (*mgr.Service, svc.Status, error) {
pid := uint32(os.Getpid())
manager, err := mgr.Connect()
if err != nil {
return nil, svc.Status{}, err
}
names, err := manager.ListServices()
if err != nil {
return nil, svc.Status{}, err
}
for _, name := range names {
service, err := manager.OpenService(name)
if err != nil {
continue
}
status, err := service.Query()
if err != nil {
continue
}
if status.ProcessId == pid {
// pid match; found ourself
return service, status, nil
}
}
return nil, svc.Status{}, fmt.Errorf("failed to find service")
}
8 changes: 7 additions & 1 deletion x-pack/elastic-agent/pkg/agent/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,18 @@ func NewCommandWithArgs(args []string, streams *cli.IOStreams) *cobra.Command {
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("d"))
cmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("environment"))

// subcommands
// sub-commands
run := newRunCommandWithArgs(flags, args, streams)
cmd.AddCommand(basecmd.NewDefaultCommandsWithArgs(args, streams)...)
cmd.AddCommand(run)
cmd.AddCommand(newEnrollCommandWithArgs(flags, args, streams))
cmd.AddCommand(newIntrospectCommandWithArgs(flags, args, streams))

// windows special hidden sub-command (only added on windows)
reexec := newReExecWindowsCommand(flags, args, streams)
if reexec != nil {
cmd.AddCommand(reexec)
}
cmd.Run = run.Run

return cmd
Expand Down
17 changes: 17 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/reexec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !windows

package cmd

import (
"github.com/spf13/cobra"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
)

func newReExecWindowsCommand(flags *globalFlags, _ []string, streams *cli.IOStreams) *cobra.Command {
return nil
}
76 changes: 76 additions & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/reexec_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build windows

package cmd

import (
"fmt"
"os"
"strconv"
"time"

"github.com/spf13/cobra"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/mgr"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
)

func newReExecWindowsCommand(flags *globalFlags, _ []string, streams *cli.IOStreams) *cobra.Command {
cmd := &cobra.Command{
Hidden: true,
Use: "reexec_windows <service_name> <pid>",
Short: "ReExec the windows service",
Long: "This waits for the windows service to stop then restarts it to allow self-upgrading.",
Args: cobra.ExactArgs(2),
Run: func(c *cobra.Command, args []string) {
serviceName := args[0]
servicePid, err := strconv.Atoi(args[1])
if err != nil {
fmt.Fprintf(streams.Err, "%v\n", err)
os.Exit(1)
}
err = reExec(serviceName, servicePid)
if err != nil {
fmt.Fprintf(streams.Err, "%v\n", err)
os.Exit(1)
}
},
}

return cmd
}

func reExec(serviceName string, servicePid int) error {
manager, err := mgr.Connect()
if err != nil {
return errors.New(err, "failed to connect to service manager")
}
service, err := manager.OpenService(serviceName)
if err != nil {
return errors.New(err, "failed to open service")
}
for {
status, err := service.Query()
if err != nil {
return errors.New(err, "failed to query service")
}
if status.State == svc.Stopped {
err = service.Start()
if err != nil {
return errors.New(err, "failed to start service")
}
// triggered restart; done
return nil
}
if int(status.ProcessId) != servicePid {
// already restarted; has different PID, done!
return nil
}
<-time.After(300 * time.Millisecond)
}
}
45 changes: 36 additions & 9 deletions x-pack/elastic-agent/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/reexec"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli"
Expand Down Expand Up @@ -74,6 +75,13 @@ func run(flags *globalFlags, streams *cli.IOStreams) error {
service.BeforeRun()
defer service.Cleanup()

execPath, err := os.Executable()
if err != nil {
return err
}
rexLogger := logger.Named("reexec")
rex := reexec.Manager(rexLogger, execPath)

app, err := application.New(logger, pathConfigFile)
if err != nil {
return err
Expand All @@ -91,16 +99,35 @@ func run(flags *globalFlags, streams *cli.IOStreams) error {
}
service.HandleSignals(stopBeat, cancel)

// listen for kill signal
// listen for signals
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT)

select {
case <-stop:
break
case <-signals:
break
signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
reexecing := false
for {
breakout := false
select {
case <-stop:
breakout = true
case <-rex.ShutdownChan():
reexecing = true
breakout = true
case sig := <-signals:
if sig == syscall.SIGHUP {
rexLogger.Infof("SIGHUP triggered re-exec")
rex.ReExec()
} else {
breakout = true
}
}
if breakout {
break
}
}

return app.Stop()
err = app.Stop()
if !reexecing {
return err
}
rex.ShutdownComplete()
return err
}

0 comments on commit 5a37193

Please sign in to comment.