Skip to content

Commit

Permalink
Update Beat launcher to pass beater config section to New (#2014)
Browse files Browse the repository at this point in the history
* Pass empty config, if config section is missing
* Fix filebeat config merging + tests
* Update winlogbeat configtest error message order
  • Loading branch information
Steffen Siering authored and ruflin committed Jul 13, 2016
1 parent b8bdeac commit eecb37a
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 67 deletions.
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config.Filebeat
config := fb.config

// Setup registrar to persist state
registrar, err := registrar.New(config.RegistryFile)
Expand Down
34 changes: 15 additions & 19 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,7 @@ const (
DefaultInputType = "log"
)

var (
DefaultConfig = Config{
FilebeatConfig{
RegistryFile: "registry",
SpoolSize: 2048,
IdleTimeout: 5 * time.Second,
},
}
)

type Config struct {
Filebeat FilebeatConfig
}

type FilebeatConfig struct {
Prospectors []*common.Config `config:"prospectors"`
SpoolSize uint64 `config:"spool_size" validate:"min=1"`
PublishAsync bool `config:"publish_async"`
Expand All @@ -41,6 +27,14 @@ type FilebeatConfig struct {
ConfigDir string `config:"config_dir"`
}

var (
DefaultConfig = Config{
RegistryFile: "registry",
SpoolSize: 2048,
IdleTimeout: 5 * time.Second,
}
)

const (
LogInputType = "log"
StdinInputType = "stdin"
Expand Down Expand Up @@ -89,10 +83,12 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
for _, file := range configFiles {
logp.Info("Additional configs loaded from: %s", file)

tmpConfig := &Config{}
cfgfile.Read(tmpConfig, file)
tmpConfig := struct {
Filebeat Config
}{}
cfgfile.Read(&tmpConfig, file)

config.Filebeat.Prospectors = append(config.Filebeat.Prospectors, tmpConfig.Filebeat.Prospectors...)
config.Prospectors = append(config.Prospectors, tmpConfig.Filebeat.Prospectors...)
}

return nil
Expand All @@ -101,7 +97,7 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
// Fetches and merges all config files given by configDir. All are put into one config object
func (config *Config) FetchConfigs() error {

configDir := config.Filebeat.ConfigDir
configDir := config.ConfigDir

// If option not set, do nothing
if configDir == "" {
Expand All @@ -127,7 +123,7 @@ func (config *Config) FetchConfigs() error {
return err
}

if len(config.Filebeat.Prospectors) == 0 {
if len(config.Prospectors) == 0 {
err := errors.New("No paths given. What files do you want me to watch?")
log.Fatalf("%v", err)
return err
Expand Down
4 changes: 2 additions & 2 deletions filebeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestReadConfig2(t *testing.T) {
err = cfgfile.Read(config, absPath+"/config2.yml")
assert.Nil(t, err)

assert.Equal(t, uint64(0), config.Filebeat.SpoolSize)
assert.Equal(t, uint64(0), config.SpoolSize)
}

func TestGetConfigFiles_File(t *testing.T) {
Expand Down Expand Up @@ -94,5 +94,5 @@ func TestMergeConfigFiles(t *testing.T) {
config := &Config{}
mergeConfigFiles(files, config)

assert.Equal(t, 4, len(config.Filebeat.Prospectors))
assert.Equal(t, 4, len(config.Prospectors))
}
2 changes: 1 addition & 1 deletion filebeat/spooler/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type spoolerConfig struct {
// New creates and returns a new Spooler. The returned Spooler must be
// started by calling Start before it can be used.
func New(
config cfg.FilebeatConfig,
config *cfg.Config,
publisher chan<- []*input.FileEvent,
) (*Spooler, error) {

Expand Down
16 changes: 9 additions & 7 deletions filebeat/spooler/spooler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ import (
"github.com/stretchr/testify/assert"
)

func load(t *testing.T, in string) cfg.FilebeatConfig {
func load(t *testing.T, in string) *cfg.Config {
yaml, err := common.NewConfigWithYAML([]byte(in), "")
if err != nil {
t.Fatalf("Failed to parse config input: %v", err)
}

config := cfg.DefaultConfig
err = yaml.Unpack(&config)
tmpConfig := struct {
Filebeat cfg.Config
}{cfg.DefaultConfig}
err = yaml.Unpack(&tmpConfig)
if err != nil {
t.Fatalf("Failed to unpack config: %v", err)
}

return config.Filebeat
return &tmpConfig.Filebeat
}

func TestNewSpoolerDefaultConfig(t *testing.T) {
Expand All @@ -33,13 +35,13 @@ func TestNewSpoolerDefaultConfig(t *testing.T) {
spooler, err := New(config, nil)

assert.NoError(t, err)
assert.Equal(t, cfg.DefaultConfig.Filebeat.SpoolSize, spooler.config.spoolSize)
assert.Equal(t, cfg.DefaultConfig.Filebeat.IdleTimeout, spooler.config.idleTimeout)
assert.Equal(t, cfg.DefaultConfig.SpoolSize, spooler.config.spoolSize)
assert.Equal(t, cfg.DefaultConfig.IdleTimeout, spooler.config.idleTimeout)
}

func TestNewSpoolerSpoolSize(t *testing.T) {
spoolSize := uint64(19)
config := cfg.FilebeatConfig{SpoolSize: spoolSize}
config := &cfg.Config{SpoolSize: spoolSize}
spooler, err := New(config, nil)

assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (bt *{{cookiecutter.beat|capitalize}}) Run(b *beat.Beat) error {
logp.Info("{{cookiecutter.beat}} is running! Hit CTRL-C to stop it.")

bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.{{cookiecutter.beat|capitalize}}.Period)
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
select {
Expand Down
8 changes: 1 addition & 7 deletions generate/beat/{{cookiecutter.beat}}/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@ package config
import "time"

type Config struct {
{{cookiecutter.beat|capitalize}} {{cookiecutter.beat|capitalize}}Config
}

type {{cookiecutter.beat|capitalize}}Config struct {
Period time.Duration `config:"period"`
}

var DefaultConfig = Config{
{{cookiecutter.beat|capitalize}}: {{cookiecutter.beat|capitalize}}Config{
Period: 1 * time.Second,
},
Period: 1 * time.Second,
}
18 changes: 16 additions & 2 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"math/rand"
"os"
"runtime"
"strings"
"time"

"github.com/elastic/beats/libbeat/cfgfile"
Expand Down Expand Up @@ -146,7 +147,8 @@ func newBeat(name, version string) *Beat {
}

func (b *Beat) launch(bt Creator) error {
if err := b.handleFlags(); err != nil {
err := b.handleFlags()
if err != nil {
return err
}

Expand All @@ -157,6 +159,18 @@ func (b *Beat) launch(bt Creator) error {
return err
}

// load the beats config section
var sub *common.Config
configName := strings.ToLower(b.Name)
if b.RawConfig.HasField(configName) {
sub, err = b.RawConfig.Child(configName, -1)
if err != nil {
return err
}
} else {
sub = common.NewConfig()
}

logp.Info("Setup Beat: %s; Version: %s", b.Name, b.Version)
processors, err := processors.New(b.Config.Processors)
if err != nil {
Expand All @@ -174,7 +188,7 @@ func (b *Beat) launch(bt Creator) error {
// defer publisher.Stop()

b.Publisher = publisher
beater, err := bt(b, b.RawConfig)
beater, err := bt(b, sub)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/beater/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import "github.com/elastic/beats/libbeat/common"
// Config is the root of the Metricbeat configuration hierarchy.
type Config struct {
// Modules is a list of module specific configuration data.
Modules []*common.Config `config:"metricbeat.modules" validate:"required"`
Modules []*common.Config `config:"modules" validate:"required"`
}
26 changes: 12 additions & 14 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,12 @@ func init() {

func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config := config.Config{
Packetbeat: config.PacketbeatConfig{
Interfaces: config.InterfacesConfig{
File: *cmdLineArgs.File,
Loop: *cmdLineArgs.Loop,
TopSpeed: *cmdLineArgs.TopSpeed,
OneAtATime: *cmdLineArgs.OneAtAtime,
Dumpfile: *cmdLineArgs.Dumpfile,
},
Interfaces: config.InterfacesConfig{
File: *cmdLineArgs.File,
Loop: *cmdLineArgs.Loop,
TopSpeed: *cmdLineArgs.TopSpeed,
OneAtATime: *cmdLineArgs.OneAtAtime,
Dumpfile: *cmdLineArgs.Dumpfile,
},
}
err := rawConfig.Unpack(&config)
Expand All @@ -98,7 +96,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// init packetbeat components
func (pb *Packetbeat) init(b *beat.Beat) error {

cfg := &pb.Config.Packetbeat
cfg := &pb.Config
err := procs.ProcWatcher.Init(cfg.Procs)
if err != nil {
logp.Critical(err.Error())
Expand Down Expand Up @@ -148,7 +146,7 @@ func (pb *Packetbeat) Run(b *beat.Beat) error {
pb.Pub.Start()

// This needs to be after the sniffer Init but before the sniffer Run.
if err := droppriv.DropPrivileges(pb.Config.Packetbeat.RunOptions); err != nil {
if err := droppriv.DropPrivileges(pb.Config.RunOptions); err != nil {
return err
}

Expand Down Expand Up @@ -199,7 +197,7 @@ func (pb *Packetbeat) Stop() {
}

func (pb *Packetbeat) setupSniffer() error {
cfg := &pb.Config.Packetbeat
cfg := &pb.Config

withVlans := cfg.Interfaces.With_vlans
_, withICMP := cfg.Protocols["icmp"]
Expand All @@ -217,16 +215,16 @@ func (pb *Packetbeat) makeWorkerFactory(filter string) sniffer.WorkerFactory {
var f *flows.Flows
var err error

if pb.Config.Packetbeat.Flows != nil {
f, err = flows.NewFlows(pb.Pub, pb.Config.Packetbeat.Flows)
if pb.Config.Flows != nil {
f, err = flows.NewFlows(pb.Pub, pb.Config.Flows)
if err != nil {
return nil, "", err
}
}

var icmp4 icmp.ICMPv4Processor
var icmp6 icmp.ICMPv6Processor
if cfg, exists := pb.Config.Packetbeat.Protocols["icmp"]; exists {
if cfg, exists := pb.Config.Protocols["icmp"]; exists {
icmp, err := icmp.New(false, pb.Pub, cfg)
if err != nil {
return nil, "", err
Expand Down
12 changes: 4 additions & 8 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@ import (
)

type Config struct {
Packetbeat PacketbeatConfig
}

type PacketbeatConfig struct {
Interfaces InterfacesConfig
Flows *Flows
Protocols map[string]*common.Config
Procs procs.ProcsConfig
Interfaces InterfacesConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*common.Config `config:"protocols"`
Procs procs.ProcsConfig `config:"procs"`
RunOptions droppriv.RunOptions
}

Expand Down
5 changes: 4 additions & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ type Winlogbeat struct {
}

// New returns a new Winlogbeat.
func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
func New(b *beat.Beat, _ *common.Config) (beat.Beater, error) {
// Read configuration.
// XXX: winlogbeat validates top-level config -> ignore beater config and
// parse complete top-level config
config := config.DefaultSettings
rawConfig := b.RawConfig
err := rawConfig.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("Error reading configuration file. %v", err)
Expand Down
4 changes: 3 additions & 1 deletion winlogbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ var (
// Validate validates the Settings data and returns an error describing
// all problems or nil if there are none.
func (s Settings) Validate() error {
// TODO: winlogbeat should not try to validate top-level beats config

validKeys := []string{
"fields", "fields_under_root", "tags",
"name", "refresh_topology_freq", "ignore_outgoing", "topology_expire", "geoip",
"queue_size", "bulk_queue_size", "max_procs",
"filters", "logging", "output", "path", "winlogbeat",
"processors", "logging", "output", "path", "winlogbeat",
}
sort.Strings(validKeys)

Expand Down
4 changes: 2 additions & 2 deletions winlogbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func TestConfigValidate(t *testing.T) {
map[string]interface{}{"other": "value"},
},
"1 error: Invalid top-level key 'other' found. Valid keys are bulk_queue_size, " +
"fields, fields_under_root, filters, geoip, ignore_outgoing, logging, max_procs, " +
"name, output, path, queue_size, refresh_topology_freq, tags, topology_expire, winlogbeat",
"fields, fields_under_root, geoip, ignore_outgoing, logging, max_procs, " +
"name, output, path, processors, queue_size, refresh_topology_freq, tags, topology_expire, winlogbeat",
},
{
WinlogbeatConfig{},
Expand Down

0 comments on commit eecb37a

Please sign in to comment.