Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Outputs Mark II #107

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pkg/
tivan
.vagrant
telegraf
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- [#85](https://github.com/influxdb/telegraf/pull/85): Fix GetLocalHost testutil function for mac users
- [#89](https://github.com/influxdb/telegraf/pull/89): go fmt fixes
- [#94](https://github.com/influxdb/telegraf/pull/94): Fix for issue #93, explicitly call sarama.v1 -> sarama
- [#101](https://github.com/influxdb/telegraf/issues/101): switch back from master branch if building locally

## v0.1.4 [2015-07-09]

Expand Down
131 changes: 72 additions & 59 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package telegraf
import (
"fmt"
"log"
"net/url"
"os"
"sort"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
)

type runningOutput struct {
name string
output outputs.Output
}

type runningPlugin struct {
name string
plugin plugins.Plugin
Expand All @@ -32,9 +36,8 @@ type Agent struct {

Config *Config

outputs []*runningOutput
plugins []*runningPlugin

conn *client.Client
}

// NewAgent returns an Agent struct based off the given Config
Expand Down Expand Up @@ -66,36 +69,39 @@ func NewAgent(config *Config) (*Agent, error) {

// Connect connects to the agent's config URL
func (a *Agent) Connect() error {
config := a.Config

u, err := url.Parse(config.URL)
if err != nil {
return err
for _, o := range a.outputs {
err := o.output.Connect()
if err != nil {
return err
}
}
return nil
}

c, err := client.NewClient(client.Config{
URL: *u,
Username: config.Username,
Password: config.Password,
UserAgent: config.UserAgent,
Timeout: config.Timeout.Duration,
})
// LoadOutputs loads the agent's outputs
func (a *Agent) LoadOutputs() ([]string, error) {
var names []string

if err != nil {
return err
}
for _, name := range a.Config.OutputsDeclared() {
creator, ok := outputs.Outputs[name]
if !ok {
return nil, fmt.Errorf("Undefined but requested output: %s", name)
}

_, err = c.Query(client.Query{
Command: fmt.Sprintf("CREATE DATABASE telegraf"),
})
output := creator()

err := a.Config.ApplyOutput(name, output)
if err != nil {
return nil, err
}

if err != nil && !strings.Contains(err.Error(), "database already exists") {
log.Fatal(err)
a.outputs = append(a.outputs, &runningOutput{name, output})
names = append(names, name)
}

a.conn = c
sort.Strings(names)

return nil
return names, nil
}

// LoadPlugins loads the agent's plugins
Expand Down Expand Up @@ -174,61 +180,59 @@ func (a *Agent) crankParallel() error {

close(points)

var acc BatchPoints
acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
var bp BatchPoints
bp.Time = time.Now()
bp.Tags = a.Config.Tags

for sub := range points {
acc.Points = append(acc.Points, sub.Points...)
bp.Points = append(bp.Points, sub.Points...)
}

_, err := a.conn.Write(acc.BatchPoints)
return err
return a.flush(&bp)
}

func (a *Agent) crank() error {
var acc BatchPoints
var bp BatchPoints

acc.Debug = a.Debug
bp.Debug = a.Debug

for _, plugin := range a.plugins {
acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
}
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
bp.Time = time.Now()
bp.Tags = a.Config.Tags

_, err := a.conn.Write(acc.BatchPoints)
return err
return a.flush(&bp)
}

func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
ticker := time.NewTicker(plugin.config.Interval)

for {
var acc BatchPoints
var bp BatchPoints

acc.Debug = a.Debug
bp.Debug = a.Debug

acc.Prefix = plugin.name + "_"
acc.Config = plugin.config
err := plugin.plugin.Gather(&acc)
bp.Prefix = plugin.name + "_"
bp.Config = plugin.config
err := plugin.plugin.Gather(&bp)
if err != nil {
return err
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
bp.Tags = a.Config.Tags
bp.Time = time.Now()

a.conn.Write(acc.BatchPoints)
err = a.flush(&bp)
if err != nil {
return err
}

select {
case <-shutdown:
Expand All @@ -239,6 +243,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
}
}

func (a *Agent) flush(bp *BatchPoints) error {
var wg sync.WaitGroup
var outerr error
for _, o := range a.outputs {
wg.Add(1)
go func(ro *runningOutput) {
defer wg.Done()
outerr = ro.output.Write(bp.BatchPoints)
}(o)
}

wg.Wait()

return outerr
}

// TestAllPlugins verifies that we can 'Gather' from all plugins with the
// default configuration
func (a *Agent) TestAllPlugins() error {
Expand Down Expand Up @@ -297,13 +317,6 @@ func (a *Agent) Test() error {

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
if a.conn == nil {
err := a.Connect()
if err != nil {
return err
}
}

var wg sync.WaitGroup

for _, plugin := range a.plugins {
Expand Down
29 changes: 19 additions & 10 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/influxdb/telegraf"
_ "github.com/influxdb/telegraf/outputs/all"
_ "github.com/influxdb/telegraf/plugins/all"
)

Expand All @@ -21,16 +22,13 @@ var fPidfile = flag.String("pidfile", "", "file to write our pid to")
var fPLuginsFilter = flag.String("filter", "", "filter the plugins to enable, separator is :")

// Telegraf version
var Version = "unreleased"

// Telegraf commit
var Commit = ""
var Version = "0.1.5-dev"

func main() {
flag.Parse()

if *fVersion {
fmt.Printf("InfluxDB Telegraf agent - Version %s\n", Version)
fmt.Printf("Telegraf - Version %s\n", Version)
return
}

Expand Down Expand Up @@ -62,6 +60,15 @@ func main() {
ag.Debug = true
}

outputs, err := ag.LoadOutputs()
if err != nil {
log.Fatal(err)
}
if len(outputs) == 0 {
log.Printf("Error: no outputs found, did you provide a config file?")
os.Exit(1)
}

plugins, err := ag.LoadPlugins(*fPLuginsFilter)
if err != nil {
log.Fatal(err)
Expand All @@ -70,6 +77,10 @@ func main() {
log.Printf("Error: no plugins found, did you provide a config file?")
os.Exit(1)
}
if len(plugins) == 0 {
log.Printf("Error: no plugins found, did you provide a config file?")
os.Exit(1)
}

if *fTest {
if *fConfig != "" {
Expand Down Expand Up @@ -101,18 +112,16 @@ func main() {
close(shutdown)
}()

log.Print("InfluxDB Agent running")
log.Printf("Starting Telegraf (version %s)\n", Version)
log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug {
log.Printf("Debug: enabled")
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n",
ag.Interval, ag.Debug, ag.Hostname)
}

if config.URL != "" {
log.Printf("Sending metrics to: %s", config.URL)
log.Printf("Tags enabled: %v", config.ListTags())
}
log.Printf("Tags enabled: %v", config.ListTags())

if *fPidfile != "" {
f, err := os.Create(*fPidfile)
Expand Down
Loading