From 35605b796f3d16b1ea35557e08df0ae2d466710e Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 25 Mar 2016 14:39:54 -0400 Subject: [PATCH] Document Beater Interface Add godocs describing how the Beater interface is used. Read YAML configuration file one time (instead of twice) Remove os.Exit calls from Packetbeat Print Config OK to stdout on succesful -configtest. Changed HandleFlags() to return an error Fixes #1234 --- CHANGELOG.asciidoc | 2 + filebeat/beater/filebeat.go | 3 +- filebeat/tests/system/test_prospector.py | 4 +- libbeat/beat/beat.go | 426 +++++++++++------------ libbeat/beat/beat_test.go | 20 +- libbeat/beat/errors.go | 30 ++ libbeat/cfgfile/cfgfile.go | 52 +-- libbeat/tests/system/test_base.py | 15 +- metricbeat/beater/metricbeat.go | 3 +- metricbeat/tests/system/test_base.py | 2 +- packetbeat/beater/packetbeat.go | 56 ++- topbeat/beater/topbeat.go | 3 +- winlogbeat/beater/winlogbeat.go | 3 +- 13 files changed, 306 insertions(+), 313 deletions(-) create mode 100644 libbeat/beat/errors.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 744f93a8b96..16ef5cbadc0 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -18,6 +18,7 @@ https://github.com/elastic/beats/compare/v1.1.2...master[Check the HEAD diff] - Run function to start a beat no returns an error instead of directly exiting. {pull}771[771] - Move event preprocessor applying GeoIP to packetbeat {pull}772[772] - Add include_fields and drop_fields as part of generic filtering {pull}1120[1120] +- The method signature of HandleFlags() was changed to allow returning an error {pull}1249[1249] *Packetbeat* - Rename output fields in the dns package. Former flag `recursion_allowed` becomes `recursion_available`. {pull}803[803] @@ -83,6 +84,7 @@ https://github.com/elastic/beats/compare/v1.1.2...master[Check the HEAD diff] - Ensure proper shutdown of libbeat. {pull}1075[1075] - Add `fields` and `fields_under_root` options under the `shipper` configuration {pull}1092[1092] - Add the ability to use a SOCKS5 proxy with the Logstash output {issue}823[823] +- The `-configtest` flag will now print "Config OK" to stdout on success {pull}1249[1249] *Packetbeat* - Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803] diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 42f90122d1e..a19827e92e5 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/logp" cfg "github.com/elastic/beats/filebeat/config" @@ -32,7 +31,7 @@ func New() *Filebeat { func (fb *Filebeat) Config(b *beat.Beat) error { // Load Base config - err := cfgfile.Read(&fb.FbConfig, "") + err := b.RawConfig.Unpack(&fb.FbConfig) if err != nil { return fmt.Errorf("Error reading config file: %v", err) diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index 608e39bf6a7..c3f12a05b32 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -250,7 +250,7 @@ def test_shutdown_no_prospectors(self): self.wait_until( lambda: self.log_contains( - "shutting down"), + "Exiting"), max_timeout=10) filebeat.check_kill_and_wait(exit_code=1) @@ -273,7 +273,7 @@ def test_no_paths_defined(self): self.wait_until( lambda: self.log_contains( - "shutting down"), + "Exiting"), max_timeout=10) filebeat.check_kill_and_wait(exit_code=1) diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 17c45a31ea4..15822d6be49 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -1,84 +1,105 @@ /* +Package beat provides the functions required to manage the life-cycle of a Beat. +It provides the standard mechanism for launching a Beat. It manages +configuration, logging, and publisher initialization and registers a signal +handler to gracefully stop the process. -Package beat provides the basic environment for each beat. +Each Beat implementation must implement the Beater interface and may optionally +implement the FlagsHandler interface. See the Beater interface documentation for +more details. -Each beat implementation has to implement the beater interface. +To use this package, create a simple main that invokes the Run() function. + func main() { + if err := beat.Run("mybeat", myVersion, beater.New()); err != nil { + os.Exit(1) + } + } -# Start / Stop / Exit a Beat +In the example above, the beater package contains the implementation of the +Beater interface and the New() method returns a new instance of Beater. The +Beater implementation is placed into its own package so that it can be reused +or combined with other Beats. -A beat is start by calling the Run(name string, version string, bt Beater) function and passing the beater object. -This will create new beat and will Start the beat in its own go process. The Run function is blocked until -the Beat.exit channel is closed. This can be done through calling Beat.Exit(). This happens for example when CTRL-C -is pressed. - -A beat can be stopped and started again through beat.Stop and beat.Start. When starting a beat again, it is important to -run it again in it's own go process. To allow a beat to be properly reastarted, it is important that Beater.Stop() properly -closes all channels and go processes. - -In case a beat should not run as a long running process, the beater implementation must make sure to call Beat.Exit() -when the task is completed to stop the beat. +Recommendations + * Use the logp package for logging rather than writing to stdout or stderr. + * Do not call os.Exit in any of your code. Return an error instead. Or if your + code needs to exit without an error, return beat.GracefulExit. */ package beat import ( "flag" "fmt" + "os" "runtime" - "sync" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" - "github.com/elastic/beats/libbeat/service" + svc "github.com/elastic/beats/libbeat/service" "github.com/satori/go.uuid" ) -// Beater interface that every beat must use +var ( + printVersion = flag.Bool("version", false, "Print the version and exit") +) + +var debugf = logp.MakeDebug("beat") + +// Beater is the interface that must be implemented every Beat. The full +// lifecycle of a Beat instance is managed through this interface. +// +// Life-cycle of Beater +// +// The four operational methods are always invoked serially in the following +// order: +// +// Config -> Setup -> Run -> Cleanup +// +// The Stop() method is invoked the first time (and only the first time) a +// shutdown signal is received. The Stop() method is eligible to be invoked +// at any point after Setup() completes (this ensures that the Beater +// implementation is fully initialized before Stop() can be invoked). +// +// The Cleanup() method is guaranteed to be invoked upon shutdown iff the Beater +// reaches the Setup stage. For example, if there is a failure in the +// Config stage then Cleanup will not be invoked. type Beater interface { - Config(*Beat) error - Setup(*Beat) error - Run(*Beat) error - Cleanup(*Beat) error - Stop() + Config(*Beat) error // Read and validate configuration. + Setup(*Beat) error // Initialize the Beat. + Run(*Beat) error // The main event loop. This method should block until signalled to stop by an invocation of the Stop() method. + Cleanup(*Beat) error // Cleanup is invoked to perform any final clean-up prior to exiting. + Stop() // Stop is invoked to signal that the Run method should finish its execution. It will be invoked at most once. } -// FlagsHandler (optional) Beater extension for -// handling flags input on startup. The HandleFlags callback will -// be called after parsing the command line arguments and handling -// the '--help' or '--version' flags. +// FlagsHandler is an interface that can optionally be implemented by a Beat +// if it needs to process command line flags on startup. If implemented, the +// HandleFlags method will be invoked after parsing the command line flags +// and before any of the Beater interface methods are invoked. There will be +// no callback when '-help' or '-version' are specified. type FlagsHandler interface { - HandleFlags(*Beat) + HandleFlags(*Beat) error // Handle any custom command line arguments. } -// Beat struct contains the basic beat information +// Beat contains the basic beat data and the publisher client used to publish +// events. type Beat struct { - Name string - Version string - Config *BeatConfig - BT Beater - Publisher *publisher.PublisherType - Events publisher.Client - UUID uuid.UUID - - exit chan struct{} - error error - state int8 - stateMutex sync.Mutex - callback sync.Once + Name string // Beat name. + Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version. + UUID uuid.UUID // ID assigned to a Beat instance. + BT Beater // Beater implementation. + RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. + Config BeatConfig // Common Beat configuration data. + Events publisher.Client // Client used for publishing events. + Publisher *publisher.PublisherType // Publisher + + filters *filter.FilterList // Filters } -// Defaults for config variables which are not set -const ( - StopState = 0 - ConfigState = 1 - SetupState = 2 - RunState = 3 -) - // BeatConfig struct contains the basic configuration of every beat type BeatConfig struct { Output map[string]*common.Config @@ -87,246 +108,193 @@ type BeatConfig struct { Filter []filter.FilterConfig } -var printVersion *bool +// Run initializes and runs a Beater implementation. name is the name of the +// Beat (e.g. packetbeat or topbeat). version is version number of the Beater +// implementation. bt is Beater implementation to run. +func Run(name, version string, bt Beater) error { + return newInstance(name, version, bt).launch(true) +} -// Channel that is closed as soon as the beat should exit -func init() { - printVersion = flag.Bool("version", false, "Print version and exit") +// instance contains everything related to a single instance of a beat. +type instance struct { + data *Beat + beater Beater } -// NewBeat initiates a new beat object -func NewBeat(name string, version string, bt Beater) *Beat { +// newInstance creates and initializes a new Beat instance. +func newInstance(name string, version string, bt Beater) *instance { if version == "" { version = defaultBeatVersion } - b := Beat{ - Version: version, - Name: name, - BT: bt, - UUID: uuid.NewV4(), - - exit: make(chan struct{}), - state: StopState, - } - - return &b -} - -// Run initiates and runs a new beat object -func Run(name string, version string, bt Beater) error { - - b := NewBeat(name, version, bt) - - // Runs beat inside a go process - go func() { - err := b.Start() - - if err != nil { - // TODO: detect if logging was already fully setup or not - fmt.Printf("Start error: %v\n", err) - logp.Critical("Start error: %v", err) - b.error = err - } - - // If start finishes, exit has to be called. This requires start to be blocking - // which is currently the default. - b.Exit() - }() - - // Waits until beats channel is closed - select { - case <-b.exit: - b.Stop() - logp.Info("Exit beat completed") - return b.error - } -} -// Start starts the Beat by parsing and interpreting the command line flags, -// loading and parsing the configuration file, and running the Beat. This -// method blocks until the Beat exits. If an error occurs while initializing -// or running the Beat it will be returned. -func (b *Beat) Start() error { - // Additional command line args are used to overwrite config options - err, exit := b.CommandLineSetup() - if err != nil { - return fmt.Errorf("fails to load command line setup: %v\n", err) - } - - if exit { - return nil + return &instance{ + data: &Beat{ + Name: name, + Version: version, + UUID: uuid.NewV4(), + BT: bt, + }, + beater: bt, } - - // Loads base config - err = b.LoadConfig() - if err != nil { - return fmt.Errorf("fails to load the config: %v\n", err) - } - - // Configures beat - err = b.BT.Config(b) - if err != nil { - return fmt.Errorf("fails to load the beat config: %v\n", err) - } - b.setState(ConfigState) - - // Run beat. This calls first beater.Setup, - // then beater.Run and beater.Cleanup in the end - return b.Run() } -// CommandLineSetup reads and parses the default command line params -// To set additional cmd line args use the beat.CmdLine type before calling the function -// The second return param is to detect if system should exit. True if should exit -// Exit can also be without error -func (beat *Beat) CommandLineSetup() (error, bool) { - - // The -c flag is treated separately because it needs the Beat name - err := cfgfile.ChangeDefaultCfgfileFlag(beat.Name) +// handleFlags parses the command line flags. It handles the '-version' flag +// and invokes the HandleFlags callback if implemented by the Beat. +func (bc *instance) handleFlags() error { + // Due to a dependence upon the beat name, the default config file path + // must be updated prior to CLI flag handling. + err := cfgfile.ChangeDefaultCfgfileFlag(bc.data.Name) if err != nil { - return fmt.Errorf("failed to fix the -c flag: %v\n", err), true + return fmt.Errorf("failed to set default config file path: %v", err) } flag.Parse() if *printVersion { - fmt.Printf("%s version %s (%s)\n", beat.Name, beat.Version, runtime.GOARCH) - return nil, true + fmt.Printf("%s version %s (%s), libbeat %s\n", bc.data.Name, + bc.data.Version, runtime.GOARCH, defaultBeatVersion) + return GracefulExit } - // if beater implements CLIFlags for additional CLI handling, call it now - if flagsHandler, ok := beat.BT.(FlagsHandler); ok { - flagsHandler.HandleFlags(beat) + // Invoke HandleFlags if FlagsHandler is implemented. + if flagsHandler, ok := bc.beater.(FlagsHandler); ok { + err = flagsHandler.HandleFlags(bc.data) } - return nil, false + return err } -// LoadConfig inits the config file and reads the default config information -// into Beat.Config. It exists the processes in case of errors. -func (b *Beat) LoadConfig() error { - - err := cfgfile.Read(&b.Config, "") +// config reads the configuration file from disk, parses the common options +// defined in BeatConfig, initializes logging, and set GOMAXPROCS if defined +// in the config. Lastly it invokes the Config method implemented by the beat. +func (bc *instance) config() error { + var err error + bc.data.RawConfig, err = cfgfile.Load("") if err != nil { - return fmt.Errorf("loading config file error: %v\n", err) + return fmt.Errorf("error loading config file: %v", err) } - err = logp.Init(b.Name, &b.Config.Logging) + err = bc.data.RawConfig.Unpack(&bc.data.Config) if err != nil { - return fmt.Errorf("error initializing logging: %v\n", err) + return fmt.Errorf("error unpacking config data: %v", err) } + err = logp.Init(bc.data.Name, &bc.data.Config.Logging) + if err != nil { + return fmt.Errorf("error initializing logging: %v", err) + } // Disable stderr logging if requested by cmdline flag logp.SetStderr() - logp.Debug("beat", "Initializing output plugins") + bc.data.filters, err = filter.New(bc.data.Config.Filter) + if err != nil { + return fmt.Errorf("error initializing filters: %v", err) + } + debugf("Filters: %+v", bc.data.filters) - if b.Config.Shipper.MaxProcs != nil { - maxProcs := *b.Config.Shipper.MaxProcs + if bc.data.Config.Shipper.MaxProcs != nil { + maxProcs := *bc.data.Config.Shipper.MaxProcs if maxProcs > 0 { runtime.GOMAXPROCS(maxProcs) } } - pub, err := publisher.New(b.Name, b.Config.Output, b.Config.Shipper) + return bc.beater.Config(bc.data) + + // TODO: If -configtest is set it should exit at this point. But changing + // this now would mean a change in behavior. Some Beats may depend on the + // Setup() method being invoked in order to do configuration validation. + // If we do not change this, it means -configtest requires the outputs to + // be available because the publisher is being started (this is not + // desirable - elastic/beats#1213). It (may?) also cause the index template + // to be loaded. +} + +// setup initializes the Publisher and then invokes the Setup method of the +// Beat. +func (bc *instance) setup() error { + logp.Info("Setup Beat: %s; Version: %s", bc.data.Name, bc.data.Version) + + debugf("Initializing output plugins") + var err error + bc.data.Publisher, err = publisher.New(bc.data.Name, bc.data.Config.Output, + bc.data.Config.Shipper) if err != nil { - return fmt.Errorf("error initializing publisher: %v\n", err) + return fmt.Errorf("error initializing publisher: %v", err) } - filters, err := filter.New(b.Config.Filter) + bc.data.Publisher.RegisterFilter(bc.data.filters) + bc.data.Events = bc.data.Publisher.Client() + + err = bc.beater.Setup(bc.data) if err != nil { - return fmt.Errorf("error initializing filters: %v\n", err) + return err } - b.Publisher = pub - pub.RegisterFilter(filters) - b.Events = pub.Client() - - logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version) - logp.Info("Filter %v", filters) + // If -configtest was specified, exit now prior to run. + if cfgfile.IsTestConfig() { + fmt.Println("Config OK") + return GracefulExit + } return nil } -// Run calls the beater Setup and Run methods. In case of errors +// run calls the beater Setup and Run methods. In case of errors // during the setup phase, it exits the process. -func (b *Beat) Run() error { +func (bc *instance) run() error { + logp.Info("%s start running.", bc.data.Name) + return bc.beater.Run(bc.data) +} - // Setup beater object - err := b.BT.Setup(b) - if err != nil { - return fmt.Errorf("setup returned an error: %v", err) - } - b.setState(SetupState) +// cleanup is invoked prior to exit for the purposes of performing any final +// clean-up. This method is guaranteed to be invoked on shutdown if the beat +// reaches the setup stage. +func (bc *instance) cleanup() error { + return bc.beater.Cleanup(bc.data) +} - // Up to here was the initialization, now about running - if cfgfile.IsTestConfig() { - logp.Info("Testing configuration file") - // all good, exit - return nil +// launch manages the lifecycle of the beat and guarantees the order in which +// the Beater methods are invokes and ensures a a proper exit code is set when +// an error occurs. The exit flag controls if this method calls os.Exit when +// it completes. +func (bc *instance) launch(exit bool) error { + err := bc.handleFlags() + if err != nil { + goto cleanup } - service.BeforeRun() - - // Callback is called if the processes is asked to stop. - // This needs to be called before the main loop is started so that - // it can register the signals that stop or query (on Windows) the loop. - service.HandleSignals(b.Exit) - logp.Info("%s sucessfully setup. Start running.", b.Name) - - b.setState(RunState) - // Run beater specific stuff - err = b.BT.Run(b) + err = bc.config() if err != nil { - logp.Critical("Running the beat returned an error: %v", err) + goto cleanup } - return err -} - -// Stop calls the beater Stop action. -// It can happen that this function is called more then once. -func (b *Beat) Stop() { - logp.Info("Stopping Beat") - - if b.getState() == RunState { - b.BT.Stop() + defer bc.cleanup() + err = bc.setup() + if err != nil { + goto cleanup } - service.Cleanup() - - logp.Info("Cleaning up %s before shutting down.", b.Name) + svc.HandleSignals(bc.beater.Stop) + err = bc.run() - if b.getState() > StopState { - // Call beater cleanup function - err := b.BT.Cleanup(b) - if err != nil { - logp.Err("Cleanup returned an error: %v", err) +cleanup: + if exit { + code := 0 + if ee, ok := err.(ExitError); ok { + code = ee.ExitCode + } else if err != nil { + code = 1 } - } - b.setState(StopState) -} - -// Exit begins exiting the beat and initiating shutdown -func (b *Beat) Exit() { - - b.callback.Do(func() { - logp.Info("Start exiting beat") - close(b.exit) - }) -} + if err != nil && code != 0 { + // logp may not be initialized so log the err to stderr too. + logp.Critical("Exiting: %v", err) + fmt.Fprintf(os.Stderr, "Exiting: %v\n", err) + } -// setState updates the state -func (b *Beat) setState(state int8) { - b.stateMutex.Lock() - defer b.stateMutex.Unlock() - b.state = state -} + os.Exit(code) + } -// getState fetches the state -func (b *Beat) getState() int8 { - b.stateMutex.Lock() - defer b.stateMutex.Unlock() - return b.state + return err } diff --git a/libbeat/beat/beat_test.go b/libbeat/beat/beat_test.go index 9056809ae74..3024772df2d 100644 --- a/libbeat/beat/beat_test.go +++ b/libbeat/beat/beat_test.go @@ -9,26 +9,24 @@ import ( "github.com/stretchr/testify/assert" ) -func Test_NewBeat(t *testing.T) { - +func TestNewInstance(t *testing.T) { tb := &TestBeater{} - b := NewBeat("testbeat", "0.9", tb) + b := newInstance("testbeat", "0.9", tb) - assert.Equal(t, "testbeat", b.Name) - assert.Equal(t, "0.9", b.Version) + assert.Equal(t, "testbeat", b.data.Name) + assert.Equal(t, "0.9", b.data.Version) // UUID4 should be 36 chars long - assert.Equal(t, 16, len(b.UUID)) - assert.Equal(t, 36, len(b.UUID.String())) + assert.Equal(t, 16, len(b.data.UUID)) + assert.Equal(t, 36, len(b.data.UUID.String())) } -func Test_NewBeat_UUID(t *testing.T) { - +func TestNewInstanceUUID(t *testing.T) { tb := &TestBeater{} - b := NewBeat("testbeat", "0.9", tb) + b := newInstance("testbeat", "0.9", tb) // Make sure the UUID's are different - assert.NotEqual(t, b.UUID, uuid.NewV4()) + assert.NotEqual(t, b.data.UUID, uuid.NewV4()) } // Test beat object diff --git a/libbeat/beat/errors.go b/libbeat/beat/errors.go new file mode 100644 index 00000000000..1c72013d964 --- /dev/null +++ b/libbeat/beat/errors.go @@ -0,0 +1,30 @@ +package beat + +import "fmt" + +var ( + // GracefulExit is an error that signals to exit with a code of 0. + GracefulExit = ExitError{} +) + +// ExitError is an error type that can be returned to set a specific exit code. +type ExitError struct { + ExitCode int + Cause error +} + +func (e ExitError) Error() string { + if e.Cause != nil { + return e.Cause.Error() + } + + return "" +} + +// NewExitError returns a new ExitError. +func NewExitError(code int, format string, args ...interface{}) error { + return ExitError{ + ExitCode: code, + Cause: fmt.Errorf(format, args), + } +} diff --git a/libbeat/cfgfile/cfgfile.go b/libbeat/cfgfile/cfgfile.go index 0d7fc184f2d..16a5df293bf 100644 --- a/libbeat/cfgfile/cfgfile.go +++ b/libbeat/cfgfile/cfgfile.go @@ -12,19 +12,17 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -// Command line flags -var configfile *string -var testConfig *bool - -func init() { - // The default config cannot include the beat name as it is not initialised when this - // function is called, but see ChangeDefaultCfgfileFlag +// Command line flags. +var ( + // The default config cannot include the beat name as it is not initialized + // when this variable is created. See ChangeDefaultCfgfileFlag which should + // be called prior to flags.Parse(). configfile = flag.String("c", "beat.yml", "Configuration file") testConfig = flag.Bool("configtest", false, "Test configuration and exit.") -} +) -// ChangeDefaultCfgfileFlag replaces the value and default value for the `-c` flag so that -// it reflects the beat name. +// ChangeDefaultCfgfileFlag replaces the value and default value for the `-c` +// flag so that it reflects the beat name. func ChangeDefaultCfgfileFlag(beatName string) error { cliflag := flag.Lookup("c") if cliflag == nil { @@ -41,30 +39,40 @@ func ChangeDefaultCfgfileFlag(beatName string) error { return cliflag.Value.Set(cliflag.DefValue) } -// Read reads the configuration from a yaml file into the given interface structure. -// In case path is not set this method reads from the default configuration file for the beat. +// Deprecated: Please use Load(). +// +// Read reads the configuration from a YAML file into the given interface +// structure. If path is empty this method reads from the configuration +// file specified by the '-c' command line flag. func Read(out interface{}, path string) error { + config, err := Load(path) + if err != nil { + return nil + } + return config.Unpack(out) +} + +// Load reads the configuration from a YAML file structure. If path is empty +// this method reads from the configuration file specified by the '-c' command +// line flag. +func Load(path string) (*common.Config, error) { if path == "" { path = *configfile } - filecontent, err := ioutil.ReadFile(path) + fileContent, err := ioutil.ReadFile(path) if err != nil { - return fmt.Errorf("Failed to read %s: %v. Exiting.", path, err) + return nil, fmt.Errorf("failed to read %s: %v", path, err) } - filecontent = expandEnv(filecontent) + fileContent = expandEnv(fileContent) - config, err := common.NewConfigWithYAML(filecontent, path) + config, err := common.NewConfigWithYAML(fileContent, path) if err != nil { - return fmt.Errorf("YAML config parsing failed on %s: %v. Exiting.", path, err) + return nil, fmt.Errorf("YAML config parsing failed on %s: %v", path, err) } - err = config.Unpack(out) - if err != nil { - return fmt.Errorf("Failed to apply config %s: %v. Exiting. ", path, err) - } - return nil + return config, nil } // IsTestConfig returns whether or not this is configuration used for testing diff --git a/libbeat/tests/system/test_base.py b/libbeat/tests/system/test_base.py index 2d37e2c6923..4a90e7856dd 100644 --- a/libbeat/tests/system/test_base.py +++ b/libbeat/tests/system/test_base.py @@ -15,9 +15,8 @@ def test_base(self): ) proc = self.start_beat() - self.wait_until( lambda: self.log_contains("Init Beat")) - exit_code = proc.kill_and_wait() - assert exit_code == 0 + self.wait_until( lambda: self.log_contains("Setup Beat")) + proc.check_kill_and_wait() def test_no_config(self): """ @@ -26,8 +25,8 @@ def test_no_config(self): exit_code = self.run_beat() assert exit_code == 1 - assert self.log_contains("loading config file error") is True - assert self.log_contains("Failed to read") is True + assert self.log_contains("error loading config file") is True + assert self.log_contains("failed to read") is True def test_invalid_config(self): """ @@ -39,7 +38,7 @@ def test_invalid_config(self): exit_code = self.run_beat(config="invalid.yml") assert exit_code == 1 - assert self.log_contains("loading config file error") is True + assert self.log_contains("error loading config file") is True assert self.log_contains("YAML config parsing failed") is True def test_config_test(self): @@ -53,7 +52,7 @@ def test_config_test(self): config="libbeat.yml", extra_args=["-configtest"]) assert exit_code == 0 - assert self.log_contains("Testing configuration file") is True + assert self.log_contains("Config OK") is True def test_version(self): """ @@ -70,7 +69,7 @@ def test_version(self): os.path.join(self.working_dir, "coverage.cov") ]) - assert self.log_contains("loading config file error") is False + assert self.log_contains("error loading config file") is False with open(os.path.join(self.working_dir, "mockbeat.log"), "wb") \ as outputfile: diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index ab71b354dd5..994ee34962e 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -28,7 +28,6 @@ package beater import ( "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper" "github.com/elastic/beats/metricbeat/include" @@ -47,7 +46,7 @@ func New() *Metricbeat { func (mb *Metricbeat) Config(b *beat.Beat) error { mb.MbConfig = &Config{} - err := cfgfile.Read(mb.MbConfig, "") + err := b.RawConfig.Unpack(mb.MbConfig) if err != nil { logp.Err("Error reading configuration file: %v", err) return err diff --git a/metricbeat/tests/system/test_base.py b/metricbeat/tests/system/test_base.py index 8d0054ccb6a..052d634077a 100644 --- a/metricbeat/tests/system/test_base.py +++ b/metricbeat/tests/system/test_base.py @@ -10,6 +10,6 @@ def test_base(self): ) proc = self.start_beat() - self.wait_until( lambda: self.log_contains("Init Beat")) + self.wait_until( lambda: self.log_contains("Setup Beat")) exit_code = proc.kill_and_wait() assert exit_code == 0 diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 94bc381911b..be38fc85845 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -3,12 +3,11 @@ package beater import ( "flag" "fmt" - "os" "runtime" + "sync" "time" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common/droppriv" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/service" @@ -32,7 +31,6 @@ type Packetbeat struct { CmdLineArgs CmdLineArgs Pub *publish.PacketbeatPublisher Sniff *sniffer.SnifferSetup - over chan bool services []interface { Start() @@ -78,13 +76,12 @@ func New() *Packetbeat { } // Handle custom command line flags -func (pb *Packetbeat) HandleFlags(b *beat.Beat) { +func (pb *Packetbeat) HandleFlags(b *beat.Beat) error { // -devices CLI flag if *pb.CmdLineArgs.PrintDevices { devs, err := sniffer.ListDeviceNames(true) if err != nil { - fmt.Printf("Error getting devices list: %v\n", err) - os.Exit(1) + return fmt.Errorf("Error getting devices list: %v\n", err) } if len(devs) == 0 { fmt.Printf("No devices found.") @@ -97,17 +94,19 @@ func (pb *Packetbeat) HandleFlags(b *beat.Beat) { for i, dev := range devs { fmt.Printf("%d: %s\n", i, dev) } - os.Exit(0) + return beat.GracefulExit } + return nil } // Loads the beat specific config and overwrites params based on cmd line func (pb *Packetbeat) Config(b *beat.Beat) error { // Read beat implementation config as needed for setup - err := cfgfile.Read(&pb.PbConfig, "") + err := b.RawConfig.Unpack(&pb.PbConfig) if err != nil { logp.Err("fails to read the beat config: %v, %v", err, pb.PbConfig) + return err } // CLI flags over-riding config @@ -130,7 +129,7 @@ func (pb *Packetbeat) Config(b *beat.Beat) error { // TODO: Refactor config.ConfigSingleton = pb.PbConfig - return err + return nil } // Setup packetbeat @@ -138,7 +137,7 @@ func (pb *Packetbeat) Setup(b *beat.Beat) error { if err := procs.ProcWatcher.Init(pb.PbConfig.Procs); err != nil { logp.Critical(err.Error()) - os.Exit(1) + return err } queueSize := defaultQueueSize @@ -155,22 +154,17 @@ func (pb *Packetbeat) Setup(b *beat.Beat) error { logp.Debug("main", "Initializing protocol plugins") err := protos.Protos.Init(false, pb.Pub, pb.PbConfig.Protocols) if err != nil { - logp.Critical("Initializing protocol analyzers failed: %v", err) - os.Exit(1) + return fmt.Errorf("Initializing protocol analyzers failed: %v", err) } - pb.over = make(chan bool) - logp.Debug("main", "Initializing sniffer") if err := pb.setupSniffer(); err != nil { - logp.Critical("Initializing sniffer failed: %v", err) - os.Exit(1) + return fmt.Errorf("Initializing sniffer failed: %v", err) } // This needs to be after the sniffer Init but before the sniffer Run. if err := droppriv.DropPrivileges(config.ConfigSingleton.RunOptions); err != nil { - logp.Critical(err.Error()) - os.Exit(1) + return err } return nil @@ -243,27 +237,25 @@ func (pb *Packetbeat) Run(b *beat.Beat) error { service.Start() } - // run the sniffer in background + var wg sync.WaitGroup + errC := make(chan error, 1) + + // Run the sniffer in background + wg.Add(1) go func() { + defer wg.Done() err := pb.Sniff.Run() if err != nil { - logp.Critical("Sniffer main loop failed: %v", err) - os.Exit(1) + errC <- fmt.Errorf("Sniffer main loop failed: %v", err) } - pb.over <- true }() - // Startup successful, disable stderr logging if requested by - // cmdline flag - logp.SetStderr() - logp.Debug("main", "Waiting for the sniffer to finish") - - // Wait for the goroutines to finish - for range pb.over { - if !pb.Sniff.IsAlive() { - break - } + wg.Wait() + select { + default: + case err := <-errC: + return err } // kill services diff --git a/topbeat/beater/topbeat.go b/topbeat/beater/topbeat.go index 1dc9e95b9f0..2dd8a3e461c 100644 --- a/topbeat/beater/topbeat.go +++ b/topbeat/beater/topbeat.go @@ -6,7 +6,6 @@ import ( "time" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/topbeat/system" @@ -36,7 +35,7 @@ func New() *Topbeat { func (tb *Topbeat) Config(b *beat.Beat) error { - err := cfgfile.Read(&tb.TbConfig, "") + err := b.RawConfig.Unpack(&tb.TbConfig) if err != nil { logp.Err("Error reading configuration file: %v", err) return err diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index b6fe8a77eef..56ebfc0b697 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -10,7 +10,6 @@ import ( "time" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" @@ -63,7 +62,7 @@ func New() *Winlogbeat { // Config sets up the necessary configuration to use the winlogbeat func (eb *Winlogbeat) Config(b *beat.Beat) error { // Read configuration. - err := cfgfile.Read(&eb.config, "") + err := b.RawConfig.Unpack(&eb.config) if err != nil { return fmt.Errorf("Error reading configuration file. %v", err) }