Skip to content

Commit

Permalink
Add support to configure xpack monitoring output
Browse files Browse the repository at this point in the history
- Add support to configure `monitoring.elasticsearch`. If
  `monitoring.elasticsearch` is missing, configuration from
  `output.elasticsearch` will be used. If both are present,
  `monitoring.elasticsearch` re-uses missing options from
  `outputs.elasticsearch`. E.g. this way timeouts and user/password can be
  configured in `monitoring.elasticsearch`, while re-using `hosts` and other
  connection settings.
- checks if xpack.monitoring.elasticsearch.hosts not configured if
  output.elasticsearch.hosts is configured
- send and publisher loop disconnected from event publisher to publish metric
  snapshots collected via `libbeat/monitoring`
- Introduce common.BeatInfo structure being initialized on startup with
  common beat meta-data to be used by outputs and monitoring reporter:
    - beat (e.g. filebeat, metricbeat, ...)
    - version
    - name (configure name, otherwise hostname)
    - hostname
    - uuid
  • Loading branch information
urso committed Feb 16, 2017
1 parent 1b6af55 commit 57f87b6
Show file tree
Hide file tree
Showing 22 changed files with 1,021 additions and 69 deletions.
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, fmt.Errorf("Error reading config file: %v", err)
}

moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Version)
moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info.Version)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion generator/beat/{beat}/beater/{beat}.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (bt *{Beat}) Run(b *beat.Beat) error {

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": b.Name,
"type": b.Info.Name,
"counter": counter,
}
bt.client.PublishEvent(event)
Expand Down
68 changes: 49 additions & 19 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/dashboards/dashboards"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring/report"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
Expand All @@ -57,6 +58,9 @@ import (
// Register default processors.
_ "github.com/elastic/beats/libbeat/processors/actions"
_ "github.com/elastic/beats/libbeat/processors/add_cloud_metadata"

// Register default monitoring reporting
_ "github.com/elastic/beats/libbeat/monitoring/report/elasticsearch"
)

// Beater is the interface that must be implemented by every Beat. A Beater
Expand Down Expand Up @@ -87,9 +91,7 @@ type Creator func(*Beat, *common.Config) (Beater, error)
// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Name string // Beat name.
Version string // Beat version number. Defaults to the libbeat version when an implementation does not set a version.
UUID uuid.UUID // ID assigned to a Beat instance.
Info common.BeatInfo // beat metadata.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher publisher.Publisher // Publisher
Expand All @@ -99,6 +101,7 @@ type Beat struct {
type BeatConfig struct {
Shipper publisher.ShipperConfig `config:",inline"`
Output map[string]*common.Config `config:"output"`
Monitoring *common.Config `config:"xpack.monitoring"`
Logging logp.Logging `config:"logging"`
Processors processors.PluginConfig `config:"processors"`
Path paths.Path `config:"path"`
Expand Down Expand Up @@ -138,20 +141,35 @@ func init() {
// implementation. bt is the `Creator` callback for creating a new beater
// instance.
func Run(name, version string, bt Creator) error {
return handleError(newBeat(name, version).launch(bt))
return handleError(func() error {
b, err := newBeat(name, version)
if err != nil {
return err
}
return b.launch(bt)
}())
}

// newBeat creates a new beat instance
func newBeat(name, version string) *Beat {
func newBeat(name, version string) (*Beat, error) {
if version == "" {
version = defaultBeatVersion
}

return &Beat{
Name: name,
Version: version,
UUID: uuid.NewV4(),
hostname, err := os.Hostname()
if err != nil {
return nil, err
}

return &Beat{
Info: common.BeatInfo{
Beat: name,
Version: version,
Name: hostname,
Hostname: hostname,
UUID: uuid.NewV4(),
},
}, nil
}

func (b *Beat) launch(bt Creator) error {
Expand All @@ -173,7 +191,7 @@ func (b *Beat) launch(bt Creator) error {

// load the beats config section
var sub *common.Config
configName := strings.ToLower(b.Name)
configName := strings.ToLower(b.Info.Beat)
if b.RawConfig.HasField(configName) {
sub, err = b.RawConfig.Child(configName, -1)
if err != nil {
Expand All @@ -183,14 +201,14 @@ func (b *Beat) launch(bt Creator) error {
sub = common.NewConfig()
}

logp.Info("Setup Beat: %s; Version: %s", b.Name, b.Version)
logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version)
processors, err := processors.New(b.Config.Processors)
if err != nil {
return fmt.Errorf("error initializing processors: %v", err)
}

debugf("Initializing output plugins")
publisher, err := publisher.New(b.Name, b.Version, b.Config.Output, b.Config.Shipper, processors)
publisher, err := publisher.New(b.Info, b.Config.Output, b.Config.Shipper, processors)
if err != nil {
return fmt.Errorf("error initializing publisher: %v", err)
}
Expand All @@ -205,6 +223,14 @@ func (b *Beat) launch(bt Creator) error {
return err
}

if b.Config.Monitoring.Enabled() {
reporter, err := report.New(b.Info, b.Config.Monitoring, b.Config.Output)
if err != nil {
return err
}
defer reporter.Stop()
}

// If -configtest was specified, exit now prior to run.
if cfgfile.IsTestConfig() {
fmt.Println("Config OK")
Expand All @@ -218,8 +244,8 @@ func (b *Beat) launch(bt Creator) error {
return err
}

logp.Info("%s start running.", b.Name)
defer logp.Info("%s stopped.", b.Name)
logp.Info("%s start running.", b.Info.Beat)
defer logp.Info("%s stopped.", b.Info.Beat)
defer logp.LogTotalExpvars(&b.Config.Logging)

return beater.Run(b)
Expand All @@ -230,19 +256,19 @@ func (b *Beat) launch(bt Creator) error {
func (b *Beat) handleFlags() error {
// Due to a dependence upon the beat name, the default config file path
// must be updated prior to CLI flag handling.
err := cfgfile.ChangeDefaultCfgfileFlag(b.Name)
err := cfgfile.ChangeDefaultCfgfileFlag(b.Info.Beat)
if err != nil {
return fmt.Errorf("failed to set default config file path: %v", err)
}
flag.Parse()

if *printVersion {
fmt.Printf("%s version %s (%s), libbeat %s\n",
b.Name, b.Version, runtime.GOARCH, defaultBeatVersion)
b.Info.Beat, b.Info.Version, runtime.GOARCH, defaultBeatVersion)
return GracefulExit
}

if err := logp.HandleFlags(b.Name); err != nil {
if err := logp.HandleFlags(b.Info.Beat); err != nil {
return err
}
if err := cfgfile.HandleFlags(); err != nil {
Expand All @@ -269,12 +295,16 @@ func (b *Beat) configure() error {
return fmt.Errorf("error unpacking config data: %v", err)
}

if name := b.Config.Shipper.Name; name != "" {
b.Info.Name = name
}

err = paths.InitPaths(&b.Config.Path)
if err != nil {
return fmt.Errorf("error setting default paths: %v", err)
}

err = logp.Init(b.Name, &b.Config.Logging)
err = logp.Init(b.Info.Beat, &b.Config.Logging)
if err != nil {
return fmt.Errorf("error initializing logging: %v", err)
}
Expand Down Expand Up @@ -317,7 +347,7 @@ func (b *Beat) loadDashboards() error {
}
defer esClient.Close()

err = dashboards.ImportDashboards(b.Name, b.Version, esClient, b.Config.Dashboards)
err = dashboards.ImportDashboards(b.Info.Beat, b.Info.Version, esClient, b.Config.Dashboards)
if err != nil {
return fmt.Errorf("Error importing Kibana dashboards: %v", err)
}
Expand Down
20 changes: 13 additions & 7 deletions libbeat/beat/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ import (
)

func TestNewInstance(t *testing.T) {
b := newBeat("testbeat", "0.9")
b, err := newBeat("testbeat", "0.9")
if err != nil {
panic(err)
}

assert.Equal(t, "testbeat", b.Name)
assert.Equal(t, "0.9", b.Version)
assert.Equal(t, "testbeat", b.Info.Beat)
assert.Equal(t, "0.9", b.Info.Version)

// UUID4 should be 36 chars long
assert.Equal(t, 16, len(b.UUID))
assert.Equal(t, 36, len(b.UUID.String()))
assert.Equal(t, 16, len(b.Info.UUID))
assert.Equal(t, 36, len(b.Info.UUID.String()))
}

func TestNewInstanceUUID(t *testing.T) {
b := newBeat("testbeat", "0.9")
b, err := newBeat("testbeat", "0.9")
if err != nil {
panic(err)
}

// Make sure the UUID's are different
assert.NotEqual(t, b.UUID, uuid.NewV4())
assert.NotEqual(t, b.Info.UUID, uuid.NewV4())
}
12 changes: 12 additions & 0 deletions libbeat/common/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

import "github.com/satori/go.uuid"

// BeatInfo stores a beats instance meta data.
type BeatInfo struct {
Beat string // The actual beat its name
Version string // The beat version. Defaults to the libbeat version when an implementation does not set a version
Name string // configured beat name
Hostname string // hostname
UUID uuid.UUID // ID assigned to beat instance
}
2 changes: 1 addition & 1 deletion libbeat/monitoring/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (m makeExpvar) String() string { return m() }

func addVar(r *Registry, name string, opts []Option, v Var, ev expvar.Var) {
O := varOpts(r.opts, opts)
r.Add(name, v, O.mode)
r.doAdd(name, v, O)
if O.publishExpvar && ev != nil {
expvar.Publish(fullName(r, name), ev)
}
Expand Down
16 changes: 16 additions & 0 deletions libbeat/monitoring/mode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libbeat/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "errors"

type Mode uint8

//go:generate stringer -type=Mode
const (
// Reported mode, is lowest report level with most basic metrics only
Reported Mode = iota
Expand Down
13 changes: 11 additions & 2 deletions libbeat/monitoring/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ func Report(o options) options {
return o
}

func varOpts(regOpts *options, opts []Option) options {
func DoNotReport(o options) options {
o.mode = Full
return o
}

func varOpts(regOpts *options, opts []Option) *options {
if regOpts != nil && len(opts) == 0 {
return regOpts
}

O := defaultOptions
if regOpts != nil {
O = *regOpts
Expand All @@ -40,7 +49,7 @@ func varOpts(regOpts *options, opts []Option) options {
for _, opt := range opts {
O = opt(O)
}
return O
return &O
}

func applyOpts(in *options, opts []Option) *options {
Expand Down
23 changes: 17 additions & 6 deletions libbeat/monitoring/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,21 @@ func (r *Registry) Clear() error {
// Add adds a new variable to the registry. The method panics if the variables
// name is already in use.
func (r *Registry) Add(name string, v Var, m Mode) {
panicErr(r.addNames(strings.Split(name, "."), v, m))
opts := r.opts
if m != opts.mode {
tmp := *r.opts
tmp.mode = m
opts = &tmp
}

panicErr(r.addNames(strings.Split(name, "."), v, opts))
}

func (r *Registry) doAdd(name string, v Var, opts *options) {
panicErr(r.addNames(strings.Split(name, "."), v, opts))
}

func (r *Registry) addNames(names []string, v Var, m Mode) error {
func (r *Registry) addNames(names []string, v Var, opts *options) error {
r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -139,7 +150,7 @@ func (r *Registry) addNames(names []string, v Var, m Mode) error {
return fmt.Errorf("name %v already used", name)
}

r.entries[name] = entry{v, m}
r.entries[name] = entry{v, opts.mode}
return nil
}

Expand All @@ -149,12 +160,12 @@ func (r *Registry) addNames(names []string, v Var, m Mode) error {
return fmt.Errorf("name %v already used", name)
}

return reg.addNames(names[1:], v, m)
return reg.addNames(names[1:], v, opts)
}

sub := NewRegistry()
sub.opts = r.opts
if err := sub.addNames(names[1:], v, m); err != nil {
sub.opts = opts
if err := sub.addNames(names[1:], v, opts); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 57f87b6

Please sign in to comment.