From 46fd84daa0c2bb2091a43074ab84a4ba363f9d00 Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 12 Jul 2016 14:31:21 +0200 Subject: [PATCH 1/4] Update Beat launcher to pass beater config section to New --- filebeat/beater/filebeat.go | 2 +- filebeat/config/config.go | 28 ++++++++----------- filebeat/spooler/spooler.go | 2 +- .../beater/{{cookiecutter.beat}}.go | 2 +- .../{{cookiecutter.beat}}/config/config.go | 8 +----- libbeat/beat/beat.go | 9 +++++- metricbeat/beater/config.go | 2 +- packetbeat/beater/packetbeat.go | 26 ++++++++--------- packetbeat/config/config.go | 12 +++----- winlogbeat/beater/winlogbeat.go | 5 +++- winlogbeat/config/config.go | 4 ++- 11 files changed, 47 insertions(+), 53 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 22b335e64227..121aadbdace7 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -41,7 +41,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // Run allows the beater to be run as a beat. func (fb *Filebeat) Run(b *beat.Beat) error { var err error - config := fb.config.Filebeat + config := fb.config // Setup registrar to persist state registrar, err := registrar.New(config.RegistryFile) diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 8e60ceccd45f..201f9eee8185 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -18,21 +18,7 @@ const ( DefaultInputType = "log" ) -var ( - DefaultConfig = Config{ - FilebeatConfig{ - RegistryFile: "registry", - SpoolSize: 2048, - IdleTimeout: 5 * time.Second, - }, - } -) - type Config struct { - Filebeat FilebeatConfig -} - -type FilebeatConfig struct { Prospectors []*common.Config `config:"prospectors"` SpoolSize uint64 `config:"spool_size" validate:"min=1"` PublishAsync bool `config:"publish_async"` @@ -41,6 +27,14 @@ type FilebeatConfig struct { ConfigDir string `config:"config_dir"` } +var ( + DefaultConfig = Config{ + RegistryFile: "registry", + SpoolSize: 2048, + IdleTimeout: 5 * time.Second, + } +) + const ( LogInputType = "log" StdinInputType = "stdin" @@ -92,7 +86,7 @@ func mergeConfigFiles(configFiles []string, config *Config) error { tmpConfig := &Config{} cfgfile.Read(tmpConfig, file) - config.Filebeat.Prospectors = append(config.Filebeat.Prospectors, tmpConfig.Filebeat.Prospectors...) + config.Prospectors = append(config.Prospectors, tmpConfig.Prospectors...) } return nil @@ -101,7 +95,7 @@ func mergeConfigFiles(configFiles []string, config *Config) error { // Fetches and merges all config files given by configDir. All are put into one config object func (config *Config) FetchConfigs() error { - configDir := config.Filebeat.ConfigDir + configDir := config.ConfigDir // If option not set, do nothing if configDir == "" { @@ -127,7 +121,7 @@ func (config *Config) FetchConfigs() error { return err } - if len(config.Filebeat.Prospectors) == 0 { + if len(config.Prospectors) == 0 { err := errors.New("No paths given. What files do you want me to watch?") log.Fatalf("%v", err) return err diff --git a/filebeat/spooler/spooler.go b/filebeat/spooler/spooler.go index d9afad1a97ff..29e24da2b301 100644 --- a/filebeat/spooler/spooler.go +++ b/filebeat/spooler/spooler.go @@ -33,7 +33,7 @@ type spoolerConfig struct { // New creates and returns a new Spooler. The returned Spooler must be // started by calling Start before it can be used. func New( - config cfg.FilebeatConfig, + config *cfg.Config, publisher chan<- []*input.FileEvent, ) (*Spooler, error) { diff --git a/generate/beat/{{cookiecutter.beat}}/beater/{{cookiecutter.beat}}.go b/generate/beat/{{cookiecutter.beat}}/beater/{{cookiecutter.beat}}.go index 60d67b078c81..5627053f08ce 100644 --- a/generate/beat/{{cookiecutter.beat}}/beater/{{cookiecutter.beat}}.go +++ b/generate/beat/{{cookiecutter.beat}}/beater/{{cookiecutter.beat}}.go @@ -36,7 +36,7 @@ func (bt *{{cookiecutter.beat|capitalize}}) Run(b *beat.Beat) error { logp.Info("{{cookiecutter.beat}} is running! Hit CTRL-C to stop it.") bt.client = b.Publisher.Connect() - ticker := time.NewTicker(bt.config.{{cookiecutter.beat|capitalize}}.Period) + ticker := time.NewTicker(bt.config.Period) counter := 1 for { select { diff --git a/generate/beat/{{cookiecutter.beat}}/config/config.go b/generate/beat/{{cookiecutter.beat}}/config/config.go index 373b0da35c0c..a4ad302e9941 100644 --- a/generate/beat/{{cookiecutter.beat}}/config/config.go +++ b/generate/beat/{{cookiecutter.beat}}/config/config.go @@ -6,15 +6,9 @@ package config import "time" type Config struct { - {{cookiecutter.beat|capitalize}} {{cookiecutter.beat|capitalize}}Config -} - -type {{cookiecutter.beat|capitalize}}Config struct { Period time.Duration `config:"period"` } var DefaultConfig = Config{ - {{cookiecutter.beat|capitalize}}: {{cookiecutter.beat|capitalize}}Config{ - Period: 1 * time.Second, - }, + Period: 1 * time.Second, } diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 2a0f54baaae7..211a36b6e997 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -39,6 +39,7 @@ import ( "math/rand" "os" "runtime" + "strings" "time" "github.com/elastic/beats/libbeat/cfgfile" @@ -157,6 +158,12 @@ func (b *Beat) launch(bt Creator) error { return err } + // load the beats config section + sub, err := b.RawConfig.Child(strings.ToLower(b.Name), -1) + if err != nil { + return err + } + logp.Info("Setup Beat: %s; Version: %s", b.Name, b.Version) processors, err := processors.New(b.Config.Processors) if err != nil { @@ -174,7 +181,7 @@ func (b *Beat) launch(bt Creator) error { // defer publisher.Stop() b.Publisher = publisher - beater, err := bt(b, b.RawConfig) + beater, err := bt(b, sub) if err != nil { return err } diff --git a/metricbeat/beater/config.go b/metricbeat/beater/config.go index ed5e5aa0d67f..f09c1fc9572a 100644 --- a/metricbeat/beater/config.go +++ b/metricbeat/beater/config.go @@ -5,5 +5,5 @@ import "github.com/elastic/beats/libbeat/common" // Config is the root of the Metricbeat configuration hierarchy. type Config struct { // Modules is a list of module specific configuration data. - Modules []*common.Config `config:"metricbeat.modules" validate:"required"` + Modules []*common.Config `config:"modules" validate:"required"` } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index f143fcfe11d8..51558b07a080 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -67,14 +67,12 @@ func init() { func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { config := config.Config{ - Packetbeat: config.PacketbeatConfig{ - Interfaces: config.InterfacesConfig{ - File: *cmdLineArgs.File, - Loop: *cmdLineArgs.Loop, - TopSpeed: *cmdLineArgs.TopSpeed, - OneAtATime: *cmdLineArgs.OneAtAtime, - Dumpfile: *cmdLineArgs.Dumpfile, - }, + Interfaces: config.InterfacesConfig{ + File: *cmdLineArgs.File, + Loop: *cmdLineArgs.Loop, + TopSpeed: *cmdLineArgs.TopSpeed, + OneAtATime: *cmdLineArgs.OneAtAtime, + Dumpfile: *cmdLineArgs.Dumpfile, }, } err := rawConfig.Unpack(&config) @@ -98,7 +96,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // init packetbeat components func (pb *Packetbeat) init(b *beat.Beat) error { - cfg := &pb.Config.Packetbeat + cfg := &pb.Config err := procs.ProcWatcher.Init(cfg.Procs) if err != nil { logp.Critical(err.Error()) @@ -148,7 +146,7 @@ func (pb *Packetbeat) Run(b *beat.Beat) error { pb.Pub.Start() // This needs to be after the sniffer Init but before the sniffer Run. - if err := droppriv.DropPrivileges(pb.Config.Packetbeat.RunOptions); err != nil { + if err := droppriv.DropPrivileges(pb.Config.RunOptions); err != nil { return err } @@ -199,7 +197,7 @@ func (pb *Packetbeat) Stop() { } func (pb *Packetbeat) setupSniffer() error { - cfg := &pb.Config.Packetbeat + cfg := &pb.Config withVlans := cfg.Interfaces.With_vlans _, withICMP := cfg.Protocols["icmp"] @@ -217,8 +215,8 @@ func (pb *Packetbeat) makeWorkerFactory(filter string) sniffer.WorkerFactory { var f *flows.Flows var err error - if pb.Config.Packetbeat.Flows != nil { - f, err = flows.NewFlows(pb.Pub, pb.Config.Packetbeat.Flows) + if pb.Config.Flows != nil { + f, err = flows.NewFlows(pb.Pub, pb.Config.Flows) if err != nil { return nil, "", err } @@ -226,7 +224,7 @@ func (pb *Packetbeat) makeWorkerFactory(filter string) sniffer.WorkerFactory { var icmp4 icmp.ICMPv4Processor var icmp6 icmp.ICMPv6Processor - if cfg, exists := pb.Config.Packetbeat.Protocols["icmp"]; exists { + if cfg, exists := pb.Config.Protocols["icmp"]; exists { icmp, err := icmp.New(false, pb.Pub, cfg) if err != nil { return nil, "", err diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 8cef7d4b4d72..91a8dad38cf2 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -9,14 +9,10 @@ import ( ) type Config struct { - Packetbeat PacketbeatConfig -} - -type PacketbeatConfig struct { - Interfaces InterfacesConfig - Flows *Flows - Protocols map[string]*common.Config - Procs procs.ProcsConfig + Interfaces InterfacesConfig `config:"interfaces"` + Flows *Flows `config:"flows"` + Protocols map[string]*common.Config `config:"protocols"` + Procs procs.ProcsConfig `config:"procs"` RunOptions droppriv.RunOptions } diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index 52619f904290..8abaf9f9ca59 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -53,9 +53,12 @@ type Winlogbeat struct { } // New returns a new Winlogbeat. -func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { +func New(b *beat.Beat, _ *common.Config) (beat.Beater, error) { // Read configuration. + // XXX: winlogbeat validates top-level config -> ignore beater config and + // parse complete top-level config config := config.DefaultSettings + rawConfig := b.RawConfig err := rawConfig.Unpack(&config) if err != nil { return nil, fmt.Errorf("Error reading configuration file. %v", err) diff --git a/winlogbeat/config/config.go b/winlogbeat/config/config.go index f811c678cdfd..4681d4e17bfd 100644 --- a/winlogbeat/config/config.go +++ b/winlogbeat/config/config.go @@ -42,11 +42,13 @@ var ( // Validate validates the Settings data and returns an error describing // all problems or nil if there are none. func (s Settings) Validate() error { + // TODO: winlogbeat should not try to validate top-level beats config + validKeys := []string{ "fields", "fields_under_root", "tags", "name", "refresh_topology_freq", "ignore_outgoing", "topology_expire", "geoip", "queue_size", "bulk_queue_size", "max_procs", - "filters", "logging", "output", "path", "winlogbeat", + "processors", "logging", "output", "path", "winlogbeat", } sort.Strings(validKeys) From 07a373fbc1da9f80e741e4ba29edacb1cd1f7c5e Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 12 Jul 2016 15:55:08 +0200 Subject: [PATCH 2/4] Pass empty config, if config section is missing --- libbeat/beat/beat.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 211a36b6e997..b7053c3829f8 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -147,7 +147,8 @@ func newBeat(name, version string) *Beat { } func (b *Beat) launch(bt Creator) error { - if err := b.handleFlags(); err != nil { + err := b.handleFlags() + if err != nil { return err } @@ -159,9 +160,15 @@ func (b *Beat) launch(bt Creator) error { } // load the beats config section - sub, err := b.RawConfig.Child(strings.ToLower(b.Name), -1) - if err != nil { - return err + var sub *common.Config + configName := strings.ToLower(b.Name) + if b.RawConfig.HasField(configName) { + sub, err = b.RawConfig.Child(configName, -1) + if err != nil { + return err + } + } else { + sub = common.NewConfig() } logp.Info("Setup Beat: %s; Version: %s", b.Name, b.Version) From 4b992ae38ad2a1c0bbc8c597eef5ad6a07c8062a Mon Sep 17 00:00:00 2001 From: urso Date: Tue, 12 Jul 2016 16:24:18 +0200 Subject: [PATCH 3/4] Fix filebeat config merging + tests --- filebeat/config/config.go | 8 +++++--- filebeat/config/config_test.go | 4 ++-- filebeat/spooler/spooler_test.go | 16 +++++++++------- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 201f9eee8185..8d8d2439c87a 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -83,10 +83,12 @@ func mergeConfigFiles(configFiles []string, config *Config) error { for _, file := range configFiles { logp.Info("Additional configs loaded from: %s", file) - tmpConfig := &Config{} - cfgfile.Read(tmpConfig, file) + tmpConfig := struct { + Filebeat Config + }{} + cfgfile.Read(&tmpConfig, file) - config.Prospectors = append(config.Prospectors, tmpConfig.Prospectors...) + config.Prospectors = append(config.Prospectors, tmpConfig.Filebeat.Prospectors...) } return nil diff --git a/filebeat/config/config_test.go b/filebeat/config/config_test.go index ca12c40862c9..0fe866d19c7e 100644 --- a/filebeat/config/config_test.go +++ b/filebeat/config/config_test.go @@ -23,7 +23,7 @@ func TestReadConfig2(t *testing.T) { err = cfgfile.Read(config, absPath+"/config2.yml") assert.Nil(t, err) - assert.Equal(t, uint64(0), config.Filebeat.SpoolSize) + assert.Equal(t, uint64(0), config.SpoolSize) } func TestGetConfigFiles_File(t *testing.T) { @@ -94,5 +94,5 @@ func TestMergeConfigFiles(t *testing.T) { config := &Config{} mergeConfigFiles(files, config) - assert.Equal(t, 4, len(config.Filebeat.Prospectors)) + assert.Equal(t, 4, len(config.Prospectors)) } diff --git a/filebeat/spooler/spooler_test.go b/filebeat/spooler/spooler_test.go index 76273d9f4f1d..21a9beac2ad0 100644 --- a/filebeat/spooler/spooler_test.go +++ b/filebeat/spooler/spooler_test.go @@ -11,19 +11,21 @@ import ( "github.com/stretchr/testify/assert" ) -func load(t *testing.T, in string) cfg.FilebeatConfig { +func load(t *testing.T, in string) *cfg.Config { yaml, err := common.NewConfigWithYAML([]byte(in), "") if err != nil { t.Fatalf("Failed to parse config input: %v", err) } - config := cfg.DefaultConfig - err = yaml.Unpack(&config) + tmpConfig := struct { + Filebeat cfg.Config + }{cfg.DefaultConfig} + err = yaml.Unpack(&tmpConfig) if err != nil { t.Fatalf("Failed to unpack config: %v", err) } - return config.Filebeat + return &tmpConfig.Filebeat } func TestNewSpoolerDefaultConfig(t *testing.T) { @@ -33,13 +35,13 @@ func TestNewSpoolerDefaultConfig(t *testing.T) { spooler, err := New(config, nil) assert.NoError(t, err) - assert.Equal(t, cfg.DefaultConfig.Filebeat.SpoolSize, spooler.config.spoolSize) - assert.Equal(t, cfg.DefaultConfig.Filebeat.IdleTimeout, spooler.config.idleTimeout) + assert.Equal(t, cfg.DefaultConfig.SpoolSize, spooler.config.spoolSize) + assert.Equal(t, cfg.DefaultConfig.IdleTimeout, spooler.config.idleTimeout) } func TestNewSpoolerSpoolSize(t *testing.T) { spoolSize := uint64(19) - config := cfg.FilebeatConfig{SpoolSize: spoolSize} + config := &cfg.Config{SpoolSize: spoolSize} spooler, err := New(config, nil) assert.NoError(t, err) From b05416518249303a4c62b86b22b4a17ddc170035 Mon Sep 17 00:00:00 2001 From: urso Date: Wed, 13 Jul 2016 00:09:52 +0200 Subject: [PATCH 4/4] Update winlogbeat configtest error message order --- winlogbeat/config/config_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/winlogbeat/config/config_test.go b/winlogbeat/config/config_test.go index e28406656e09..2bb9e470bc83 100644 --- a/winlogbeat/config/config_test.go +++ b/winlogbeat/config/config_test.go @@ -45,8 +45,8 @@ func TestConfigValidate(t *testing.T) { map[string]interface{}{"other": "value"}, }, "1 error: Invalid top-level key 'other' found. Valid keys are bulk_queue_size, " + - "fields, fields_under_root, filters, geoip, ignore_outgoing, logging, max_procs, " + - "name, output, path, queue_size, refresh_topology_freq, tags, topology_expire, winlogbeat", + "fields, fields_under_root, geoip, ignore_outgoing, logging, max_procs, " + + "name, output, path, processors, queue_size, refresh_topology_freq, tags, topology_expire, winlogbeat", }, { WinlogbeatConfig{},