From def5f49d7fb527e808e067dc797764cf4f3f717a Mon Sep 17 00:00:00 2001 From: Tait Clarridge Date: Tue, 1 Dec 2015 09:15:28 -0500 Subject: [PATCH] Add support for pass/drop/tagpass/tagdrop for outputs Reuses same logic as the plugins for filtering points, should be only a marginal performance decrease to check all the points before writing to the output. Added examples to the README as well (for generic pass/drop as well as output pass/drop/tagpass/tagdrop). X-Github-Closes #398 --- README.md | 34 ++++++++++++++++ accumulator.go | 10 ++--- agent.go | 15 ++++++- internal/config/config.go | 74 ++++++++++++++++++++-------------- internal/config/config_test.go | 8 ++-- 5 files changed, 99 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 9d9ef38599396..4228f3f7b232d 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,19 @@ Below is how to configure `tagpass` and `tagdrop` parameters (added in 0.1.5) path = [ "/opt", "/home" ] ``` +Below is how to configure `pass` and `drop` parameters (added in 0.1.5) + +``` +# Drop all metrics for guest CPU usage +[[plugins.cpu]] + drop = [ "cpu_usage_guest" ] + +# Only store inode related metrics for disks +[[plugins.disk]] + pass = [ "disk_inodes" ] +``` + + Additional plugins (or outputs) of the same type can be specified, just define another instance in the config file: @@ -224,6 +237,27 @@ Telegraf also supports specifying multiple output sinks to send data to, configuring each output sink is different, but examples can be found by running `telegraf -sample-config`. +Outputs also support the same configurable options as plugins (pass, drop, tagpass, tagdrop) + +``` +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "telegraf" + # Drop all measurements that start with "aerospike" + drop = ["aerospike"] + +# Send to a different database +[[outputs.influxdb]] + urls = [ "http://localhost:8086" ] + database = "mydb" + precision = "s" + +# Only store measurements where the tag "mytag" matches the value "B" +[outputs.influxdb.tagpass] + mytag = ["B"] +``` + + ## Supported Outputs * influxdb diff --git a/accumulator.go b/accumulator.go index 2e8b61d1caaae..fe8b299d69bbf 100644 --- a/accumulator.go +++ b/accumulator.go @@ -29,12 +29,12 @@ type Accumulator interface { } func NewAccumulator( - pluginConfig *config.PluginConfig, + filterConfig *config.PrefixFilterConfig, points chan *client.Point, ) Accumulator { acc := accumulator{} acc.points = points - acc.pluginConfig = pluginConfig + acc.filterConfig = filterConfig return &acc } @@ -47,7 +47,7 @@ type accumulator struct { debug bool - pluginConfig *config.PluginConfig + filterConfig *config.PrefixFilterConfig prefix string } @@ -106,8 +106,8 @@ func (ac *accumulator) AddFields( measurement = ac.prefix + measurement } - if ac.pluginConfig != nil { - if !ac.pluginConfig.ShouldPass(measurement) || !ac.pluginConfig.ShouldTagsPass(tags) { + if ac.filterConfig != nil { + if !ac.filterConfig.ShouldPass(measurement) || !ac.filterConfig.ShouldTagsPass(tags) { return } } diff --git a/agent.go b/agent.go index e1292bffe78e5..decefa8b0e862 100644 --- a/agent.go +++ b/agent.go @@ -217,6 +217,7 @@ func (a *Agent) writeOutput( shutdown chan struct{}, wg *sync.WaitGroup, ) { + var filteredPoints []*client.Point defer wg.Done() if len(points) == 0 { return @@ -226,12 +227,22 @@ func (a *Agent) writeOutput( start := time.Now() for { - err := ro.Output.Write(points) + if (len(ro.Config.Pass) + len(ro.Config.Drop) + len(ro.Config.TagDrop) + len(ro.Config.TagPass)) > 0 { + for i := range points { + if !ro.Config.ShouldPass(points[i].Name()) || !ro.Config.ShouldTagsPass(points[i].Tags()) { + continue + } + filteredPoints = append(filteredPoints, points[i]) + } + } else { + filteredPoints = points + } + err := ro.Output.Write(filteredPoints) if err == nil { // Write successful elapsed := time.Since(start) log.Printf("Flushed %d metrics to output %s in %s\n", - len(points), ro.Name, elapsed) + len(filteredPoints), ro.Name, elapsed) return } diff --git a/internal/config/config.go b/internal/config/config.go index 5bb77f2c6a2fe..f481efd8b1b14 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -88,17 +88,18 @@ type TagFilter struct { type RunningOutput struct { Name string Output outputs.Output + Config *PrefixFilterConfig } type RunningPlugin struct { Name string Plugin plugins.Plugin - Config *PluginConfig + Config *PrefixFilterConfig } -// PluginConfig containing a name, interval, and drop/pass prefix lists +// ModuleConfig contains the name, interval (for plugin) and drop/pass prefix lists // Also lists the tags to filter -type PluginConfig struct { +type PrefixFilterConfig struct { Name string Drop []string @@ -111,10 +112,10 @@ type PluginConfig struct { } // ShouldPass returns true if the metric should pass, false if should drop -// based on the drop/pass plugin parameters -func (cp *PluginConfig) ShouldPass(measurement string) bool { - if cp.Pass != nil { - for _, pat := range cp.Pass { +// based on the drop/pass filter parameters +func (pf *PrefixFilterConfig) ShouldPass(measurement string) bool { + if pf.Pass != nil { + for _, pat := range pf.Pass { if strings.HasPrefix(measurement, pat) { return true } @@ -122,8 +123,8 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool { return false } - if cp.Drop != nil { - for _, pat := range cp.Drop { + if pf.Drop != nil { + for _, pat := range pf.Drop { if strings.HasPrefix(measurement, pat) { return false } @@ -135,10 +136,10 @@ func (cp *PluginConfig) ShouldPass(measurement string) bool { } // ShouldTagsPass returns true if the metric should pass, false if should drop -// based on the tagdrop/tagpass plugin parameters -func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool { - if cp.TagPass != nil { - for _, pat := range cp.TagPass { +// based on the tagdrop/tagpass filter parameters +func (pf *PrefixFilterConfig) ShouldTagsPass(tags map[string]string) bool { + if pf.TagPass != nil { + for _, pat := range pf.TagPass { if tagval, ok := tags[pat.Name]; ok { for _, filter := range pat.Filter { if filter == tagval { @@ -150,8 +151,8 @@ func (cp *PluginConfig) ShouldTagsPass(tags map[string]string) bool { return false } - if cp.TagDrop != nil { - for _, pat := range cp.TagDrop { + if pf.TagDrop != nil { + for _, pat := range pf.TagDrop { if tagval, ok := tags[pat.Name]; ok { for _, filter := range pat.Filter { if filter == tagval { @@ -469,15 +470,21 @@ func (c *Config) addOutput(name string, table *ast.Table) error { if !ok { return fmt.Errorf("Undefined but requested output: %s", name) } - o := creator() + output := creator() - if err := toml.UnmarshalTable(table, o); err != nil { + filterConfig, err := buildFilters(name, table) + if err != nil { + return err + } + + if err := toml.UnmarshalTable(table, output); err != nil { return err } ro := &RunningOutput{ Name: name, - Output: o, + Output: output, + Config: filterConfig, } c.Outputs = append(c.Outputs, ro) return nil @@ -493,31 +500,36 @@ func (c *Config) addPlugin(name string, table *ast.Table) error { } plugin := creator() - pluginConfig, err := applyPlugin(name, table, plugin) + filterConfig, err := buildFilters(name, table) if err != nil { return err } + + if err := toml.UnmarshalTable(table, plugin); err != nil { + return err + } + rp := &RunningPlugin{ Name: name, Plugin: plugin, - Config: pluginConfig, + Config: filterConfig, } c.Plugins = append(c.Plugins, rp) return nil } -// applyPlugin takes defined plugin names and applies them to the given -// interface, returning a PluginConfig object in the end that can -// be inserted into a runningPlugin by the agent. -func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, error) { - cp := &PluginConfig{Name: name} +// buildFilters builds a PrefixFilterConfig (tagpass/tagdrop/pass/drop) to +// be inserted into the RunningPlugin/RunningOutput to be used for prefix +// filtering on both plugins and outputs +func buildFilters(name string, tbl *ast.Table) (*PrefixFilterConfig, error) { + pf := &PrefixFilterConfig{Name: name} if node, ok := tbl.Fields["pass"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - cp.Pass = append(cp.Pass, str.Value) + pf.Pass = append(pf.Pass, str.Value) } } } @@ -529,7 +541,7 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, if ary, ok := kv.Value.(*ast.Array); ok { for _, elem := range ary.Value { if str, ok := elem.(*ast.String); ok { - cp.Drop = append(cp.Drop, str.Value) + pf.Drop = append(pf.Drop, str.Value) } } } @@ -544,7 +556,7 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, return nil, err } - cp.Interval = dur + pf.Interval = dur } } } @@ -561,7 +573,7 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, } } } - cp.TagPass = append(cp.TagPass, *tagfilter) + pf.TagPass = append(pf.TagPass, *tagfilter) } } } @@ -579,7 +591,7 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, } } } - cp.TagDrop = append(cp.TagDrop, *tagfilter) + pf.TagDrop = append(pf.TagDrop, *tagfilter) } } } @@ -590,5 +602,5 @@ func applyPlugin(name string, tbl *ast.Table, p plugins.Plugin) (*PluginConfig, delete(tbl.Fields, "interval") delete(tbl.Fields, "tagdrop") delete(tbl.Fields, "tagpass") - return cp, toml.UnmarshalTable(tbl, p) + return pf, nil } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index e5391122423ec..c0e12e093cb9d 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -18,7 +18,7 @@ func TestConfig_LoadSinglePlugin(t *testing.T) { memcached := plugins.Plugins["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &PluginConfig{ + mConfig := &PrefixFilterConfig{ Name: "memcached", Drop: []string{"other", "stuff"}, Pass: []string{"some", "strings"}, @@ -57,7 +57,7 @@ func TestConfig_LoadDirectory(t *testing.T) { memcached := plugins.Plugins["memcached"]().(*memcached.Memcached) memcached.Servers = []string{"localhost"} - mConfig := &PluginConfig{ + mConfig := &PrefixFilterConfig{ Name: "memcached", Drop: []string{"other", "stuff"}, Pass: []string{"some", "strings"}, @@ -87,7 +87,7 @@ func TestConfig_LoadDirectory(t *testing.T) { Name: "myothercollector", }, } - eConfig := &PluginConfig{Name: "exec"} + eConfig := &PrefixFilterConfig{Name: "exec"} assert.Equal(t, ex, c.Plugins[1].Plugin, "Merged Testdata did not produce a correct exec struct.") assert.Equal(t, eConfig, c.Plugins[1].Config, @@ -109,7 +109,7 @@ func TestConfig_LoadDirectory(t *testing.T) { }, } - pConfig := &PluginConfig{Name: "procstat"} + pConfig := &PrefixFilterConfig{Name: "procstat"} assert.Equal(t, pstat, c.Plugins[3].Plugin, "Merged Testdata did not produce a correct procstat struct.")