Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Beat launcher to pass beater config section to New #2014

Merged
merged 4 commits into from
Jul 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config.Filebeat
config := fb.config

// Setup registrar to persist state
registrar, err := registrar.New(config.RegistryFile)
Expand Down
34 changes: 15 additions & 19 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,7 @@ const (
DefaultInputType = "log"
)

var (
DefaultConfig = Config{
FilebeatConfig{
RegistryFile: "registry",
SpoolSize: 2048,
IdleTimeout: 5 * time.Second,
},
}
)

type Config struct {
Filebeat FilebeatConfig
}

type FilebeatConfig struct {
Prospectors []*common.Config `config:"prospectors"`
SpoolSize uint64 `config:"spool_size" validate:"min=1"`
PublishAsync bool `config:"publish_async"`
Expand All @@ -41,6 +27,14 @@ type FilebeatConfig struct {
ConfigDir string `config:"config_dir"`
}

var (
DefaultConfig = Config{
RegistryFile: "registry",
SpoolSize: 2048,
IdleTimeout: 5 * time.Second,
}
)

const (
LogInputType = "log"
StdinInputType = "stdin"
Expand Down Expand Up @@ -89,10 +83,12 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
for _, file := range configFiles {
logp.Info("Additional configs loaded from: %s", file)

tmpConfig := &Config{}
cfgfile.Read(tmpConfig, file)
tmpConfig := struct {
Filebeat Config
}{}
cfgfile.Read(&tmpConfig, file)

config.Filebeat.Prospectors = append(config.Filebeat.Prospectors, tmpConfig.Filebeat.Prospectors...)
config.Prospectors = append(config.Prospectors, tmpConfig.Filebeat.Prospectors...)
}

return nil
Expand All @@ -101,7 +97,7 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
// Fetches and merges all config files given by configDir. All are put into one config object
func (config *Config) FetchConfigs() error {

configDir := config.Filebeat.ConfigDir
configDir := config.ConfigDir

// If option not set, do nothing
if configDir == "" {
Expand All @@ -127,7 +123,7 @@ func (config *Config) FetchConfigs() error {
return err
}

if len(config.Filebeat.Prospectors) == 0 {
if len(config.Prospectors) == 0 {
err := errors.New("No paths given. What files do you want me to watch?")
log.Fatalf("%v", err)
return err
Expand Down
4 changes: 2 additions & 2 deletions filebeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestReadConfig2(t *testing.T) {
err = cfgfile.Read(config, absPath+"/config2.yml")
assert.Nil(t, err)

assert.Equal(t, uint64(0), config.Filebeat.SpoolSize)
assert.Equal(t, uint64(0), config.SpoolSize)
}

func TestGetConfigFiles_File(t *testing.T) {
Expand Down Expand Up @@ -94,5 +94,5 @@ func TestMergeConfigFiles(t *testing.T) {
config := &Config{}
mergeConfigFiles(files, config)

assert.Equal(t, 4, len(config.Filebeat.Prospectors))
assert.Equal(t, 4, len(config.Prospectors))
}
2 changes: 1 addition & 1 deletion filebeat/spooler/spooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type spoolerConfig struct {
// New creates and returns a new Spooler. The returned Spooler must be
// started by calling Start before it can be used.
func New(
config cfg.FilebeatConfig,
config *cfg.Config,
publisher chan<- []*input.FileEvent,
) (*Spooler, error) {

Expand Down
16 changes: 9 additions & 7 deletions filebeat/spooler/spooler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@ import (
"github.com/stretchr/testify/assert"
)

func load(t *testing.T, in string) cfg.FilebeatConfig {
func load(t *testing.T, in string) *cfg.Config {
yaml, err := common.NewConfigWithYAML([]byte(in), "")
if err != nil {
t.Fatalf("Failed to parse config input: %v", err)
}

config := cfg.DefaultConfig
err = yaml.Unpack(&config)
tmpConfig := struct {
Filebeat cfg.Config
}{cfg.DefaultConfig}
err = yaml.Unpack(&tmpConfig)
if err != nil {
t.Fatalf("Failed to unpack config: %v", err)
}

return config.Filebeat
return &tmpConfig.Filebeat
}

func TestNewSpoolerDefaultConfig(t *testing.T) {
Expand All @@ -33,13 +35,13 @@ func TestNewSpoolerDefaultConfig(t *testing.T) {
spooler, err := New(config, nil)

assert.NoError(t, err)
assert.Equal(t, cfg.DefaultConfig.Filebeat.SpoolSize, spooler.config.spoolSize)
assert.Equal(t, cfg.DefaultConfig.Filebeat.IdleTimeout, spooler.config.idleTimeout)
assert.Equal(t, cfg.DefaultConfig.SpoolSize, spooler.config.spoolSize)
assert.Equal(t, cfg.DefaultConfig.IdleTimeout, spooler.config.idleTimeout)
}

func TestNewSpoolerSpoolSize(t *testing.T) {
spoolSize := uint64(19)
config := cfg.FilebeatConfig{SpoolSize: spoolSize}
config := &cfg.Config{SpoolSize: spoolSize}
spooler, err := New(config, nil)

assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (bt *{{cookiecutter.beat|capitalize}}) Run(b *beat.Beat) error {
logp.Info("{{cookiecutter.beat}} is running! Hit CTRL-C to stop it.")

bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.{{cookiecutter.beat|capitalize}}.Period)
ticker := time.NewTicker(bt.config.Period)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One less :-D

counter := 1
for {
select {
Expand Down
8 changes: 1 addition & 7 deletions generate/beat/{{cookiecutter.beat}}/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@ package config
import "time"

type Config struct {
{{cookiecutter.beat|capitalize}} {{cookiecutter.beat|capitalize}}Config
}

type {{cookiecutter.beat|capitalize}}Config struct {
Period time.Duration `config:"period"`
}

var DefaultConfig = Config{
{{cookiecutter.beat|capitalize}}: {{cookiecutter.beat|capitalize}}Config{
Period: 1 * time.Second,
},
Period: 1 * time.Second,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-3 :-D

}
18 changes: 16 additions & 2 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"math/rand"
"os"
"runtime"
"strings"
"time"

"github.com/elastic/beats/libbeat/cfgfile"
Expand Down Expand Up @@ -146,7 +147,8 @@ func newBeat(name, version string) *Beat {
}

func (b *Beat) launch(bt Creator) error {
if err := b.handleFlags(); err != nil {
err := b.handleFlags()
if err != nil {
return err
}

Expand All @@ -157,6 +159,18 @@ func (b *Beat) launch(bt Creator) error {
return err
}

// load the beats config section
var sub *common.Config
configName := strings.ToLower(b.Name)
if b.RawConfig.HasField(configName) {
sub, err = b.RawConfig.Child(configName, -1)
if err != nil {
return err
}
} else {
sub = common.NewConfig()
}

logp.Info("Setup Beat: %s; Version: %s", b.Name, b.Version)
processors, err := processors.New(b.Config.Processors)
if err != nil {
Expand All @@ -174,7 +188,7 @@ func (b *Beat) launch(bt Creator) error {
// defer publisher.Stop()

b.Publisher = publisher
beater, err := bt(b, b.RawConfig)
beater, err := bt(b, sub)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/beater/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ import "github.com/elastic/beats/libbeat/common"
// Config is the root of the Metricbeat configuration hierarchy.
type Config struct {
// Modules is a list of module specific configuration data.
Modules []*common.Config `config:"metricbeat.modules" validate:"required"`
Modules []*common.Config `config:"modules" validate:"required"`
}
26 changes: 12 additions & 14 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,12 @@ func init() {

func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config := config.Config{
Packetbeat: config.PacketbeatConfig{
Interfaces: config.InterfacesConfig{
File: *cmdLineArgs.File,
Loop: *cmdLineArgs.Loop,
TopSpeed: *cmdLineArgs.TopSpeed,
OneAtATime: *cmdLineArgs.OneAtAtime,
Dumpfile: *cmdLineArgs.Dumpfile,
},
Interfaces: config.InterfacesConfig{
File: *cmdLineArgs.File,
Loop: *cmdLineArgs.Loop,
TopSpeed: *cmdLineArgs.TopSpeed,
OneAtATime: *cmdLineArgs.OneAtAtime,
Dumpfile: *cmdLineArgs.Dumpfile,
},
}
err := rawConfig.Unpack(&config)
Expand All @@ -98,7 +96,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// init packetbeat components
func (pb *Packetbeat) init(b *beat.Beat) error {

cfg := &pb.Config.Packetbeat
cfg := &pb.Config
err := procs.ProcWatcher.Init(cfg.Procs)
if err != nil {
logp.Critical(err.Error())
Expand Down Expand Up @@ -148,7 +146,7 @@ func (pb *Packetbeat) Run(b *beat.Beat) error {
pb.Pub.Start()

// This needs to be after the sniffer Init but before the sniffer Run.
if err := droppriv.DropPrivileges(pb.Config.Packetbeat.RunOptions); err != nil {
if err := droppriv.DropPrivileges(pb.Config.RunOptions); err != nil {
return err
}

Expand Down Expand Up @@ -199,7 +197,7 @@ func (pb *Packetbeat) Stop() {
}

func (pb *Packetbeat) setupSniffer() error {
cfg := &pb.Config.Packetbeat
cfg := &pb.Config

withVlans := cfg.Interfaces.With_vlans
_, withICMP := cfg.Protocols["icmp"]
Expand All @@ -217,16 +215,16 @@ func (pb *Packetbeat) makeWorkerFactory(filter string) sniffer.WorkerFactory {
var f *flows.Flows
var err error

if pb.Config.Packetbeat.Flows != nil {
f, err = flows.NewFlows(pb.Pub, pb.Config.Packetbeat.Flows)
if pb.Config.Flows != nil {
f, err = flows.NewFlows(pb.Pub, pb.Config.Flows)
if err != nil {
return nil, "", err
}
}

var icmp4 icmp.ICMPv4Processor
var icmp6 icmp.ICMPv6Processor
if cfg, exists := pb.Config.Packetbeat.Protocols["icmp"]; exists {
if cfg, exists := pb.Config.Protocols["icmp"]; exists {
icmp, err := icmp.New(false, pb.Pub, cfg)
if err != nil {
return nil, "", err
Expand Down
12 changes: 4 additions & 8 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,10 @@ import (
)

type Config struct {
Packetbeat PacketbeatConfig
}

type PacketbeatConfig struct {
Interfaces InterfacesConfig
Flows *Flows
Protocols map[string]*common.Config
Procs procs.ProcsConfig
Interfaces InterfacesConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*common.Config `config:"protocols"`
Procs procs.ProcsConfig `config:"procs"`
RunOptions droppriv.RunOptions
}

Expand Down
5 changes: 4 additions & 1 deletion winlogbeat/beater/winlogbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ type Winlogbeat struct {
}

// New returns a new Winlogbeat.
func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
func New(b *beat.Beat, _ *common.Config) (beat.Beater, error) {
// Read configuration.
// XXX: winlogbeat validates top-level config -> ignore beater config and
// parse complete top-level config
config := config.DefaultSettings
rawConfig := b.RawConfig
err := rawConfig.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("Error reading configuration file. %v", err)
Expand Down
4 changes: 3 additions & 1 deletion winlogbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ var (
// Validate validates the Settings data and returns an error describing
// all problems or nil if there are none.
func (s Settings) Validate() error {
// TODO: winlogbeat should not try to validate top-level beats config

validKeys := []string{
"fields", "fields_under_root", "tags",
"name", "refresh_topology_freq", "ignore_outgoing", "topology_expire", "geoip",
"queue_size", "bulk_queue_size", "max_procs",
"filters", "logging", "output", "path", "winlogbeat",
"processors", "logging", "output", "path", "winlogbeat",
}
sort.Strings(validKeys)

Expand Down
4 changes: 2 additions & 2 deletions winlogbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func TestConfigValidate(t *testing.T) {
map[string]interface{}{"other": "value"},
},
"1 error: Invalid top-level key 'other' found. Valid keys are bulk_queue_size, " +
"fields, fields_under_root, filters, geoip, ignore_outgoing, logging, max_procs, " +
"name, output, path, queue_size, refresh_topology_freq, tags, topology_expire, winlogbeat",
"fields, fields_under_root, geoip, ignore_outgoing, logging, max_procs, " +
"name, output, path, processors, queue_size, refresh_topology_freq, tags, topology_expire, winlogbeat",
},
{
WinlogbeatConfig{},
Expand Down