Skip to content

Commit

Permalink
Dynamic configuration reloading for modules in Metricbeat (#3281)
Browse files Browse the repository at this point in the history
Currently if new module configs have to be added or changed, Metricbeat needs to be restarted. This change allows to define a configuration directory where new files can be added, removed or modified and updates will automatically processed by Metricbeat. New modules will started / stopped accordingly. This is especially useful in container environments where 1 container is used to monitor all services in other containers on the same host. New containers appear and disappear dynamically which requires changes to which modules are needed and which hosts must be monitored.

**Configuration**

The configuration in the main metricbeat.yml config file looks as following:
```
metricbeat.reload.modules:
  enabled: true
  path: configs/*.yml
  period: 10s
```

A path with a glob most be defined on which files should be checked for changes. A period is set on how often the files are checked for changes.

The configuration inside the files which are found by the glob look as following:
```
- module: system
  metricsets: ["cpu"]
  enabled: false
  perdiod: 1s

- module: system
  metricsets: ["network"]
  enabled: true
  period: 10s
```

Each file directly contains a list of modules. Each file can contain one or multiple module definitions.

**How does it work**

When Metricbeat is started a process is started that monitors the defined glob files for changes. If a change is detected, all config files are reloaded and module wrappers are created. A hash is created for the module wrapper to identify if it is a module that is already running or a new module. After loading all modules, all modules which are not in the list anymore will be stopped and removed. All new ones will be started.

The effect of the above is that if 2 identical module configuration exists, only 1 is started. It is not expected to have 2 modules with the exact same configuration.

The dynamic modules have no effect on the modules defined in the global configuration file. These modules will never change and keep running. Also if there is a dynamic module with the exact same config as a global module, these two will not be correlated and both will be run.

**Changes**

* Introduce configuration variables for config reloading
* Change metricbeat behaviour that if `metricbeat.config.enabled: true`, on startup also 0 modules can be defined. Also it is possible that at some point in time no modules are running.
* Change shutdown to also shut down dynamic modules
* Expvar stats for reloading added
* Add common configuration files to metricbeat to have additional configs except from modules
* Add basic system tests
  • Loading branch information
ruflin authored and andrewkroh committed Jan 6, 2017
1 parent 7b314a5 commit 77cdb8f
Show file tree
Hide file tree
Showing 23 changed files with 1,097 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Experimental Prometheus module. {pull}3202[3202]
- Add system socket module that reports all TCP sockets. {pull}3246[3246]
- Kafka consumer groups metricset. {pull}3240[3240]
- Add dynamic configuration reloading for modules. {pull}3281[3281]

*Packetbeat*

Expand Down
18 changes: 12 additions & 6 deletions libbeat/cfgfile/cfgfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,7 @@ func Load(path string) (*common.Config, error) {
var config *common.Config
var err error

cfgpath := ""
if *configPath != "" {
cfgpath = *configPath
} else if *homePath != "" {
cfgpath = *homePath
}
cfgpath := GetPathConfig()

if path == "" {
list := []string{}
Expand Down Expand Up @@ -132,6 +127,17 @@ func Load(path string) (*common.Config, error) {
)
}

// GetPathConfig returns ${path.config}. If ${path.config} is not set, ${path.home} is returned.
func GetPathConfig() string {
if *configPath != "" {
return *configPath
} else if *homePath != "" {
return *homePath
}
// TODO: Do we need this or should we always return *homePath?
return ""
}

// IsTestConfig returns whether or not this is configuration used for testing
func IsTestConfig() bool {
return *testConfig
Expand Down
6 changes: 4 additions & 2 deletions metricbeat/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ collect-docs: python-env
# Collects all module configs
.PHONY: configs
configs: python-env
. ${PYTHON_ENV}/bin/activate; python ${ES_BEATS}/metricbeat/scripts/config_collector.py --beat ${BEATNAME} $(PWD) > _meta/beat.yml
. ${PYTHON_ENV}/bin/activate; python ${ES_BEATS}/metricbeat/scripts/config_collector.py --beat ${BEATNAME} --full $(PWD) > _meta/beat.full.yml
cat ${ES_BEATS}/metricbeat/_meta/common.yml > _meta/beat.yml
. ${PYTHON_ENV}/bin/activate; python ${ES_BEATS}/metricbeat/scripts/config_collector.py --beat ${BEATNAME} $(PWD) >> _meta/beat.yml
cat ${ES_BEATS}/metricbeat/_meta/common.full.yml > _meta/beat.full.yml
. ${PYTHON_ENV}/bin/activate; python ${ES_BEATS}/metricbeat/scripts/config_collector.py --beat ${BEATNAME} --full $(PWD) >> _meta/beat.full.yml

# Generates imports for all modules and metricsets
.PHONY: imports
Expand Down
15 changes: 15 additions & 0 deletions metricbeat/_meta/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/metricbeat/index.html

#============================ Config Reloading ===============================

# Config reloading allows to dynamically load modules. Each file which is
# monitored must contain one or multiple modules as a list.
metricbeat.modules.reload:

# Glob pattern for configuration reloading
path: ${path.config}/conf.d/*.yml

# Period on which files under path should be checked for chagnes
period: 10s

# Set to true to enable config reloading
enabled: false

#========================== Modules configuration ============================
metricbeat.modules:

Expand Down
23 changes: 23 additions & 0 deletions metricbeat/_meta/common.full.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
########################## Metricbeat Configuration ###########################

# This file is a full configuration example documenting all non-deprecated
# options in comments. For a shorter configuration example, that contains only
# the most common options, please see metricbeat.yml in the same directory.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/metricbeat/index.html

#============================ Config Reloading ===============================

# Config reloading allows to dynamically load modules. Each file which is
# monitored must contain one or multiple modules as a list.
metricbeat.modules.reload:

# Glob pattern for configuration reloading
path: ${path.config}/conf.d/*.yml

# Period on which files under path should be checked for chagnes
period: 10s

# Set to true to enable config reloading
enabled: false
8 changes: 8 additions & 0 deletions metricbeat/_meta/common.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
###################### Metricbeat Configuration Example #######################

# This file is an example configuration file highlighting only the most common
# options. The metricbeat.full.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/metricbeat/index.html
29 changes: 27 additions & 2 deletions metricbeat/beater/config.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,34 @@
package beater

import "github.com/elastic/beats/libbeat/common"
import (
"time"

"github.com/elastic/beats/libbeat/common"
)

// Config is the root of the Metricbeat configuration hierarchy.
type Config struct {
// Modules is a list of module specific configuration data.
Modules []*common.Config `config:"modules" validate:"required"`
Modules []*common.Config `config:"modules"`
ReloadModules ModulesReloadConfig `config:"reload.modules"`
}

type ModulesReloadConfig struct {
// If path is a relative path, it is relative to the ${path.config}
Path string `config:"path"`
Period time.Duration `config:"period"`
Enabled bool `config:"enabled"`
}

var (
DefaultConfig = Config{
ReloadModules: ModulesReloadConfig{
Period: 10 * time.Second,
Enabled: false,
},
}
)

func (c *ModulesReloadConfig) IsEnabled() bool {
return c.Enabled
}
79 changes: 79 additions & 0 deletions metricbeat/beater/glob_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package beater

import (
"os"
"path/filepath"
"time"

"github.com/elastic/beats/libbeat/logp"
"github.com/mitchellh/hashstructure"
)

type GlobWatcher struct {
glob string
lastScan time.Time
lastHash uint64
}

func NewGlobWatcher(glob string) *GlobWatcher {
return &GlobWatcher{
lastScan: time.Time{},
lastHash: 0,
glob: glob,
}
}

// Scan scans all files matched by the glob and checks if the number of files or the modtime of the files changed
// It returns the list of files, a boolean if anything in the glob changed and potential errors.
// To detect changes not only mod time is compared but also the hash of the files list. This is required to
// also detect files which were removed.
// The modtime is compared based on second as normally mod-time is in seconds. If it is unclear if something changed
// the method will return true for the changes. It is strongly recommend to call scan not more frequent then 1s.
func (gw *GlobWatcher) Scan() ([]string, bool, error) {

globList, err := filepath.Glob(gw.glob)
if err != nil {
return nil, false, err
}

updatedFiles := false
files := []string{}

lastScan := time.Now()
defer func() { gw.lastScan = lastScan }()

for _, f := range globList {

info, err := os.Stat(f)
if err != nil {
logp.Err("Error getting stats for file: %s", f)
continue
}

// Directories are skipped
if !info.Mode().IsRegular() {
continue
}

// Check if one of the files was changed recently
// File modification time can be in seconds. -1 is to cover for files which
// were created during this second.
if info.ModTime().After(gw.lastScan.Truncate(time.Second)) {
updatedFiles = true
}
files = append(files, f)
}

hash, err := hashstructure.Hash(files, nil)
if err != nil {
return files, true, err
}
defer func() { gw.lastHash = hash }()

// Check if something changed
if !updatedFiles && hash == gw.lastHash {
return files, false, nil
}

return files, true, nil
}
60 changes: 60 additions & 0 deletions metricbeat/beater/glob_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package beater

import (
"io/ioutil"
"math/rand"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestGlobWatcher(t *testing.T) {

// Create random temp directory
id := strconv.Itoa(rand.New(rand.NewSource(int64(time.Now().Nanosecond()))).Int())
dir, err := ioutil.TempDir("", id)
defer os.RemoveAll(dir)
assert.NoError(t, err)
glob := dir + "/*.yml"

gcd := NewGlobWatcher(glob)

content := []byte("test\n")
err = ioutil.WriteFile(dir+"/config1.yml", content, 0644)
assert.NoError(t, err)
err = ioutil.WriteFile(dir+"/config2.yml", content, 0644)
assert.NoError(t, err)

// Make sure not inside compensation time
time.Sleep(1 * time.Second)

files, changed, err := gcd.Scan()
assert.Equal(t, 2, len(files))
assert.NoError(t, err)
assert.True(t, changed)

files, changed, err = gcd.Scan()
files, changed, err = gcd.Scan()
assert.Equal(t, 2, len(files))
assert.NoError(t, err)
assert.False(t, changed)

err = ioutil.WriteFile(dir+"/config3.yml", content, 0644)
assert.NoError(t, err)

files, changed, err = gcd.Scan()
assert.Equal(t, 3, len(files))
assert.NoError(t, err)
assert.True(t, changed)

err = os.Remove(dir + "/config3.yml")
assert.NoError(t, err)

files, changed, err = gcd.Scan()
assert.Equal(t, 2, len(files))
assert.NoError(t, err)
assert.True(t, changed)
}
31 changes: 29 additions & 2 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,35 @@ type Metricbeat struct {
done chan struct{} // Channel used to initiate shutdown.
modules []*ModuleWrapper // Active list of modules.
client publisher.Client // Publisher client.
config Config
}

// New creates and returns a new Metricbeat instance.
func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// List all registered modules and metricsets.
logp.Info("%s", mb.Registry.String())

config := Config{}
config := DefaultConfig

err := rawConfig.Unpack(&config)
if err != nil {
return nil, errors.Wrap(err, "error reading configuration file")
}

modules, err := NewModuleWrappers(config.Modules, mb.Registry)
if err != nil {
return nil, err
// Empty config is fine if dynamic config is enabled
if !config.ReloadModules.IsEnabled() {
return nil, err
} else if err != mb.ErrEmptyConfig && err != mb.ErrAllModulesDisabled {
return nil, err
}
}

mb := &Metricbeat{
done: make(chan struct{}),
modules: modules,
config: config,
}
return mb, nil
}
Expand All @@ -53,6 +61,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {

// Start each module.
var cs []<-chan common.MapStr

for _, mw := range bt.modules {
c := mw.Start(bt.done)
cs = append(cs, c)
Expand All @@ -68,6 +77,24 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
PublishChannels(bt.client, cs...)
}()

if bt.config.ReloadModules.IsEnabled() {
logp.Warn("EXPERIMENTAL feature dynamic configuration reloading is enabled.")
configReloader := NewConfigReloader(bt.config.ReloadModules, b.Publisher)

// Start
wg.Add(1)
go func() {
defer wg.Done()
configReloader.Run()
}()

// Stop
go func() {
<-bt.done
configReloader.Stop()
}()
}

// Wait for PublishChannels to stop publishing.
wg.Wait()
return nil
Expand Down
21 changes: 21 additions & 0 deletions metricbeat/beater/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/elastic/beats/metricbeat/mb"

"github.com/joeshaw/multierror"
"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"
)

Expand All @@ -36,6 +37,7 @@ type ModuleWrapper struct {
mb.Module
filters *processors.Processors
metricSets []*metricSetWrapper // List of pointers to its associated MetricSets.
configHash uint64
}

// metricSetWrapper contains the MetricSet and the private data associated with
Expand Down Expand Up @@ -153,6 +155,25 @@ func (mw *ModuleWrapper) String() string {
mw.Name(), len(mw.metricSets))
}

// Hash returns the hash value of the module wrapper
// This allows to check if two modules are the same / have the same config
func (mw *ModuleWrapper) Hash() uint64 {
// Check if hash was calculated previously
if mw.configHash > 0 {
return mw.configHash
}
var err error

// Config is unpacked into map[string]interface{} to also take metricset configs into account for the hash
var c map[string]interface{}
mw.UnpackConfig(&c)
mw.configHash, err = hashstructure.Hash(c, nil)
if err != nil {
logp.Err("Error creating config hash for module %s: %s", mw.String(), err)
}
return mw.configHash
}

// metricSetWrapper methods

// startFetching performs an immediate fetch for the MetricSet then it
Expand Down
Loading

0 comments on commit 77cdb8f

Please sign in to comment.