Skip to content

Commit

Permalink
Statsd listener plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Oct 3, 2015
1 parent 64a3a71 commit 1a37001
Show file tree
Hide file tree
Showing 9 changed files with 506 additions and 31 deletions.
2 changes: 0 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ and submit new plugins.
### Plugin Guidelines

* A plugin must conform to the `plugins.Plugin` interface.
* Telegraf promises to run each plugin's Gather function serially. This means
developers don't have to worry about thread safety within these functions.
* Each generated metric automatically has the name of the plugin that generated
it prepended. This is to keep plugins honest.
* Plugins should call `plugins.Add` in their `init` function to register themselves.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ build-linux-bins: prepare
./cmd/telegraf/telegraf.go

prepare:
go get github.com/tools/godep
# go get github.com/tools/godep

docker-compose:
ifeq ($(UNAME), Darwin)
Expand Down
14 changes: 14 additions & 0 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,20 @@ func (a *Agent) Run(shutdown chan struct{}) error {
var wg sync.WaitGroup

for _, plugin := range a.plugins {

// Start service of any ServicePlugins
switch p := plugin.plugin.(type) {
case plugins.ServicePlugin:
if err := p.Start(); err != nil {
log.Printf("Service for plugin %s failed to start, exiting\n%s\n",
plugin.name, err.Error())
return err
}
defer p.Stop()
}

// Special handling for plugins that have their own collection interval
// configured. Default intervals are handled below with crankParallel
if plugin.config.Interval != 0 {
wg.Add(1)
go func(plugin *runningPlugin) {
Expand Down
52 changes: 37 additions & 15 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,18 +377,25 @@ var header = `# Telegraf configuration
[outputs]
`

var header2 = `
var pluginHeader = `
###############################################################################
# PLUGINS #
###############################################################################
`

var servicePluginHeader = `
###############################################################################
# SERVICE PLUGINS #
###############################################################################
`

// PrintSampleConfig prints the sample config
func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
fmt.Printf(header)

// Print Outputs
// Filter outputs
var onames []string
for oname := range outputs.Outputs {
if len(outputFilters) == 0 || sliceContains(oname, outputFilters) {
Expand All @@ -397,6 +404,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
}
sort.Strings(onames)

// Print Outputs
for _, oname := range onames {
creator := outputs.Outputs[oname]
output := creator()
Expand All @@ -411,9 +419,7 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
}
}

fmt.Printf(header2)

// Print Plugins
// Filter plugins
var pnames []string
for pname := range plugins.Plugins {
if len(pluginFilters) == 0 || sliceContains(pname, pluginFilters) {
Expand All @@ -422,18 +428,36 @@ func PrintSampleConfig(pluginFilters []string, outputFilters []string) {
}
sort.Strings(pnames)

// Print Plugins
fmt.Printf(pluginHeader)
servPlugins := make(map[string]plugins.ServicePlugin)
for _, pname := range pnames {
creator := plugins.Plugins[pname]
plugin := creator()

fmt.Printf("\n# %s\n[%s]", plugin.Description(), pname)

config := plugin.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf(config)
switch p := plugin.(type) {
case plugins.ServicePlugin:
servPlugins[pname] = p
continue
}

printConfig(pname, plugin)
}

// Print Service Plugins
fmt.Printf(servicePluginHeader)
for name, plugin := range servPlugins {
printConfig(name, plugin)
}
}

func printConfig(name string, plugin plugins.Plugin) {
fmt.Printf("\n# %s\n[%s]", plugin.Description(), name)
config := plugin.SampleConfig()
if config == "" {
fmt.Printf("\n # no configuration\n")
} else {
fmt.Printf(config)
}
}

Expand All @@ -449,9 +473,7 @@ func sliceContains(name string, list []string) bool {
// PrintPluginConfig prints the config usage of a single plugin.
func PrintPluginConfig(name string) error {
if creator, ok := plugins.Plugins[name]; ok {
plugin := creator()
fmt.Printf("# %s\n[%s]", plugin.Description(), name)
fmt.Printf(plugin.SampleConfig())
printConfig(name, creator())
} else {
return errors.New(fmt.Sprintf("Plugin %s not found", name))
}
Expand Down
22 changes: 9 additions & 13 deletions outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,18 @@ type InfluxDB struct {

var sampleConfig = `
# The full HTTP endpoint URL for your InfluxDB instance
# Multiple urls can be specified for InfluxDB cluster support. Server to
# write to will be randomly chosen each interval.
urls = ["http://localhost:8086"] # required.
# The target database for metrics. This database must already exist
database = "telegraf" # required.
# Connection timeout (for the connection with InfluxDB), formatted as a string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# If not provided, will default to 0 (no timeout)
# Multiple urls can be specified for InfluxDB cluster support.
urls = ["http://localhost:8086"] # required
# The target database for metrics (telegraf will create it if not exists)
database = "telegraf" # required
# # Connection timeout (for the connection with InfluxDB), formatted as a string.
# # Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
# # If not provided, will default to 0 (no timeout)
# timeout = "5s"
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
# Set the user agent for the POSTs (can be useful for log differentiation)
# # Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
`

Expand Down
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ import (
_ "github.com/influxdb/telegraf/plugins/rabbitmq"
_ "github.com/influxdb/telegraf/plugins/redis"
_ "github.com/influxdb/telegraf/plugins/rethinkdb"
_ "github.com/influxdb/telegraf/plugins/statsd"
_ "github.com/influxdb/telegraf/plugins/system"
)
24 changes: 24 additions & 0 deletions plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,35 @@ type Accumulator interface {
}

type Plugin interface {
// SampleConfig returns the default configuration of the Plugin
SampleConfig() string

// Description returns a one-sentence description on the Plugin
Description() string

// Gather takes in an accumulator and adds the metrics that the Plugin
// gathers. This is called every "interval"
Gather(Accumulator) error
}

type ServicePlugin interface {
// SampleConfig returns the default configuration of the Plugin
SampleConfig() string

// Description returns a one-sentence description on the Plugin
Description() string

// Gather takes in an accumulator and adds the metrics that the Plugin
// gathers. This is called every "interval"
Gather(Accumulator) error

// Start starts the ServicePlugin's service, whatever that may be
Start() error

// Stop stops the services and closes any necessary channels and connections
Stop()
}

type Creator func() Plugin

var Plugins = map[string]Creator{}
Expand Down
Loading

0 comments on commit 1a37001

Please sign in to comment.