From 3c7c8926fb436e683c6a74201b1c90f390ef419f Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 9 Sep 2015 15:56:10 -0600 Subject: [PATCH] Support InfluxDB clusters Closes #143 --- CHANGELOG.md | 5 ++ agent.go | 3 ++ etc/config.sample.toml | 2 +- outputs/influxdb/influxdb.go | 91 ++++++++++++++++++++++++++---------- 4 files changed, 76 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1222c97287837..77c8dac8f25dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ ## v0.1.9 [unreleased] +### Release Notes +- InfluxDB output config change: `url` is now `urls`, and is a list. Config files +will still be backwards compatible if only `url` is specified. + ### Features +- [#143](https://github.com/influxdb/telegraf/issues/143): InfluxDB clustering support ### Bugfixes - [#170](https://github.com/influxdb/telegraf/issues/170): Systemd support diff --git a/agent.go b/agent.go index 2bfa0cf8f1f07..1f40eb04b71cc 100644 --- a/agent.go +++ b/agent.go @@ -84,6 +84,9 @@ func NewAgent(config *Config) (*Agent, error) { // Connect connects to all configured outputs func (a *Agent) Connect() error { for _, o := range a.outputs { + if a.Debug { + log.Printf("Attempting connection to output: %s\n", o.name) + } err := o.output.Connect() if err != nil { return err diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 5307b6df890d7..58c851bece6c2 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -37,7 +37,7 @@ [outputs] [outputs.influxdb] # The full HTTP endpoint URL for your InfluxDB instance - url = "http://localhost:8086" # required. + urls = ["http://localhost:8086"] # required. # The target database for metrics. This database must already exist database = "telegraf" # required. diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go index 5bb74b4e33424..c47b6cd8cf991 100644 --- a/outputs/influxdb/influxdb.go +++ b/outputs/influxdb/influxdb.go @@ -1,8 +1,10 @@ package influxdb import ( + "errors" "fmt" "log" + "math/rand" "net/url" "strings" @@ -12,19 +14,23 @@ import ( ) type InfluxDB struct { + // URL is only for backwards compatability URL string + URLs []string `toml:"urls"` Username string Password string Database string UserAgent string Timeout t.Duration - conn *client.Client + conns []*client.Client } var sampleConfig = ` # The full HTTP endpoint URL for your InfluxDB instance - url = "http://localhost:8086" # required. + # Multiple urls can be specified for InfluxDB cluster support. Server to + # write to will be randomly chosen each interval. + urls = ["http://localhost:8086"] # required. # The target database for metrics. This database must already exist database = "telegraf" # required. @@ -42,33 +48,58 @@ var sampleConfig = ` ` func (i *InfluxDB) Connect() error { - u, err := url.Parse(i.URL) - if err != nil { - return err + var urls []*url.URL + for _, URL := range i.URLs { + u, err := url.Parse(URL) + if err != nil { + return err + } + urls = append(urls, u) } - c, err := client.NewClient(client.Config{ - URL: *u, - Username: i.Username, - Password: i.Password, - UserAgent: i.UserAgent, - Timeout: i.Timeout.Duration, - }) + // Backward-compatability with single Influx URL config files + // This could eventually be removed in favor of specifying the urls as a list + if i.URL != "" { + u, err := url.Parse(i.URL) + if err != nil { + return err + } + urls = append(urls, u) + } - if err != nil { - return err + var conns []*client.Client + for _, parsed_url := range urls { + c, err := client.NewClient(client.Config{ + URL: *parsed_url, + Username: i.Username, + Password: i.Password, + UserAgent: i.UserAgent, + Timeout: i.Timeout.Duration, + }) + if err != nil { + return err + } + conns = append(conns, c) } - _, err = c.Query(client.Query{ - Command: fmt.Sprintf("CREATE DATABASE %s", i.Database), - }) + // This will get set to nil if a successful connection is made + err := errors.New("Could not create database on any server") + + for _, conn := range conns { + _, e := conn.Query(client.Query{ + Command: fmt.Sprintf("CREATE DATABASE %s", i.Database), + }) - if err != nil && !strings.Contains(err.Error(), "database already exists") { - log.Fatal(err) + if e != nil && !strings.Contains(e.Error(), "database already exists") { + log.Println("ERROR: " + e.Error()) + } else { + err = nil + break + } } - i.conn = c - return nil + i.conns = conns + return err } func (i *InfluxDB) Close() error { @@ -84,12 +115,24 @@ func (i *InfluxDB) Description() string { return "Configuration for influxdb server to send metrics to" } +// Choose a random server in the cluster to write to until a successful write +// occurs, logging each unsuccessful. If all servers fail, return error. func (i *InfluxDB) Write(bp client.BatchPoints) error { bp.Database = i.Database - if _, err := i.conn.Write(bp); err != nil { - return err + + // This will get set to nil if a successful write occurs + err := errors.New("Could not write to any InfluxDB server in cluster") + + p := rand.Perm(len(i.conns)) + for _, n := range p { + if _, e := i.conns[n].Write(bp); e != nil { + log.Println("ERROR: " + e.Error()) + } else { + err = nil + break + } } - return nil + return err } func init() {