Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add riak plugin #727

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ Currently implemented sources:
* raindrops
* redis
* rethinkdb
* riak
* sql server (microsoft)
* twemproxy
* zfs
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/riak"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
Expand Down
76 changes: 76 additions & 0 deletions plugins/inputs/riak/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Riak Plugin

The Riak plugin gathers metrics from one or more riak instances.

### Configuration:

```toml
# Description
[[inputs.riak]]
# Specify a list of one or more riak http servers
servers = ["http://localhost:8098"]
```

### Measurements & Fields:

Riak provides one measurement named "riak", with the following fields:

- cpu_avg1
- cpu_avg15
- cpu_avg5
- memory_code
- memory_ets
- memory_processes
- memory_system
- memory_total
- node_get_fsm_objsize_100
- node_get_fsm_objsize_95
- node_get_fsm_objsize_99
- node_get_fsm_objsize_mean
- node_get_fsm_objsize_median
- node_get_fsm_siblings_100
- node_get_fsm_siblings_95
- node_get_fsm_siblings_99
- node_get_fsm_siblings_mean
- node_get_fsm_siblings_median
- node_get_fsm_time_100
- node_get_fsm_time_95
- node_get_fsm_time_99
- node_get_fsm_time_mean
- node_get_fsm_time_median
- node_gets
- node_gets_total
- node_put_fsm_time_100
- node_put_fsm_time_95
- node_put_fsm_time_99
- node_put_fsm_time_mean
- node_put_fsm_time_median
- node_puts
- node_puts_total
- pbc_active
- pbc_connects
- pbc_connects_total
- vnode_gets
- vnode_gets_total
- vnode_index_reads
- vnode_index_reads_total
- vnode_index_writes
- vnode_index_writes_total
- vnode_puts
- vnode_puts_total

Measurements of time (such as node_get_fsm_time_mean) are measured in nanoseconds.

### Tags:

All measurements have the following tags:

- server (the host:port of the given server address, ex. `127.0.0.1:8087`)
- nodename (the internal node name received, ex. `riak@127.0.0.1`)

### Example Output:

```
$ ./telegraf -config telegraf.conf -input-filter riak -test
> riak,nodename=riak@127.0.0.1,server=localhost:8098 cpu_avg1=31i,cpu_avg15=69i,cpu_avg5=51i,memory_code=11563738i,memory_ets=5925872i,memory_processes=30236069i,memory_system=93074971i,memory_total=123311040i,node_get_fsm_objsize_100=0i,node_get_fsm_objsize_95=0i,node_get_fsm_objsize_99=0i,node_get_fsm_objsize_mean=0i,node_get_fsm_objsize_median=0i,node_get_fsm_siblings_100=0i,node_get_fsm_siblings_95=0i,node_get_fsm_siblings_99=0i,node_get_fsm_siblings_mean=0i,node_get_fsm_siblings_median=0i,node_get_fsm_time_100=0i,node_get_fsm_time_95=0i,node_get_fsm_time_99=0i,node_get_fsm_time_mean=0i,node_get_fsm_time_median=0i,node_gets=0i,node_gets_total=19i,node_put_fsm_time_100=0i,node_put_fsm_time_95=0i,node_put_fsm_time_99=0i,node_put_fsm_time_mean=0i,node_put_fsm_time_median=0i,node_puts=0i,node_puts_total=0i,pbc_active=0i,pbc_connects=0i,pbc_connects_total=20i,vnode_gets=0i,vnode_gets_total=57i,vnode_index_reads=0i,vnode_index_reads_total=0i,vnode_index_writes=0i,vnode_index_writes_total=0i,vnode_puts=0i,vnode_puts_total=0i 1455913392622482332
```
196 changes: 196 additions & 0 deletions plugins/inputs/riak/riak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package riak

import (
"encoding/json"
"fmt"
"net/http"
"net/url"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

// Type Riak gathers statistics from one or more Riak instances
type Riak struct {
// Servers is a slice of servers as http addresses (ex. http://127.0.0.1:8098)
Servers []string

client *http.Client
}

// NewRiak return a new instance of Riak with a default http client
func NewRiak() *Riak {
return &Riak{client: http.DefaultClient}
}

// Type riakStats represents the data that is received from Riak
type riakStats struct {
CpuAvg1 int64 `json:"cpu_avg1"`
CpuAvg15 int64 `json:"cpu_avg15"`
CpuAvg5 int64 `json:"cpu_avg5"`
MemoryCode int64 `json:"memory_code"`
MemoryEts int64 `json:"memory_ets"`
MemoryProcesses int64 `json:"memory_processes"`
MemorySystem int64 `json:"memory_system"`
MemoryTotal int64 `json:"memory_total"`
NodeGetFsmObjsize100 int64 `json:"node_get_fsm_objsize_100"`
NodeGetFsmObjsize95 int64 `json:"node_get_fsm_objsize_95"`
NodeGetFsmObjsize99 int64 `json:"node_get_fsm_objsize_99"`
NodeGetFsmObjsizeMean int64 `json:"node_get_fsm_objsize_mean"`
NodeGetFsmObjsizeMedian int64 `json:"node_get_fsm_objsize_median"`
NodeGetFsmSiblings100 int64 `json:"node_get_fsm_siblings_100"`
NodeGetFsmSiblings95 int64 `json:"node_get_fsm_siblings_95"`
NodeGetFsmSiblings99 int64 `json:"node_get_fsm_siblings_99"`
NodeGetFsmSiblingsMean int64 `json:"node_get_fsm_siblings_mean"`
NodeGetFsmSiblingsMedian int64 `json:"node_get_fsm_siblings_median"`
NodeGetFsmTime100 int64 `json:"node_get_fsm_time_100"`
NodeGetFsmTime95 int64 `json:"node_get_fsm_time_95"`
NodeGetFsmTime99 int64 `json:"node_get_fsm_time_99"`
NodeGetFsmTimeMean int64 `json:"node_get_fsm_time_mean"`
NodeGetFsmTimeMedian int64 `json:"node_get_fsm_time_median"`
NodeGets int64 `json:"node_gets"`
NodeGetsTotal int64 `json:"node_gets_total"`
Nodename string `json:"nodename"`
NodePutFsmTime100 int64 `json:"node_put_fsm_time_100"`
NodePutFsmTime95 int64 `json:"node_put_fsm_time_95"`
NodePutFsmTime99 int64 `json:"node_put_fsm_time_99"`
NodePutFsmTimeMean int64 `json:"node_put_fsm_time_mean"`
NodePutFsmTimeMedian int64 `json:"node_put_fsm_time_median"`
NodePuts int64 `json:"node_puts"`
NodePutsTotal int64 `json:"node_puts_total"`
PbcActive int64 `json:"pbc_active"`
PbcConnects int64 `json:"pbc_connects"`
PbcConnectsTotal int64 `json:"pbc_connects_total"`
VnodeGets int64 `json:"vnode_gets"`
VnodeGetsTotal int64 `json:"vnode_gets_total"`
VnodeIndexReads int64 `json:"vnode_index_reads"`
VnodeIndexReadsTotal int64 `json:"vnode_index_reads_total"`
VnodeIndexWrites int64 `json:"vnode_index_writes"`
VnodeIndexWritesTotal int64 `json:"vnode_index_writes_total"`
VnodePuts int64 `json:"vnode_puts"`
VnodePutsTotal int64 `json:"vnode_puts_total"`
}

// A sample configuration to only gather stats from localhost, default port.
const sampleConfig = `
# Specify a list of one or more riak http servers
servers = ["http://localhost:8098"]
`

// Returns a sample configuration for the plugin
func (r *Riak) SampleConfig() string {
return sampleConfig
}

// Returns a description of the plugin
func (r *Riak) Description() string {
return "Read metrics one or many Riak servers"
}

// Reads stats from all configured servers.
func (r *Riak) Gather(acc telegraf.Accumulator) error {
// Default to a single server at localhost (default port) if none specified
if len(r.Servers) == 0 {
r.Servers = []string{"http://127.0.0.1:8098"}
}

// Range over all servers, gathering stats. Returns early in case of any error.
for _, s := range r.Servers {
if err := r.gatherServer(s, acc); err != nil {
return err
}
}

return nil
}

// Gathers stats from a single server, adding them to the accumulator
func (r *Riak) gatherServer(s string, acc telegraf.Accumulator) error {
// Parse the given URL to extract the server tag
u, err := url.Parse(s)
if err != nil {
return fmt.Errorf("riak unable to parse given server url %s: %s", s, err)
}

// Perform the GET request to the riak /stats endpoint
resp, err := r.client.Get(s + "/stats")
if err != nil {
return err
}
defer resp.Body.Close()

// Successful responses will always return status code 200
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("riak responded with unexepcted status code %d", resp.StatusCode)
}

// Decode the response JSON into a new stats struct
stats := &riakStats{}
if err := json.NewDecoder(resp.Body).Decode(stats); err != nil {
return fmt.Errorf("unable to decode riak response: %s", err)
}

// Build a map of tags
tags := map[string]string{
"nodename": stats.Nodename,
"server": u.Host,
}

// Build a map of field values
fields := map[string]interface{}{
"cpu_avg1": stats.CpuAvg1,
"cpu_avg15": stats.CpuAvg15,
"cpu_avg5": stats.CpuAvg5,
"memory_code": stats.MemoryCode,
"memory_ets": stats.MemoryEts,
"memory_processes": stats.MemoryProcesses,
"memory_system": stats.MemorySystem,
"memory_total": stats.MemoryTotal,
"node_get_fsm_objsize_100": stats.NodeGetFsmObjsize100,
"node_get_fsm_objsize_95": stats.NodeGetFsmObjsize95,
"node_get_fsm_objsize_99": stats.NodeGetFsmObjsize99,
"node_get_fsm_objsize_mean": stats.NodeGetFsmObjsizeMean,
"node_get_fsm_objsize_median": stats.NodeGetFsmObjsizeMedian,
"node_get_fsm_siblings_100": stats.NodeGetFsmSiblings100,
"node_get_fsm_siblings_95": stats.NodeGetFsmSiblings95,
"node_get_fsm_siblings_99": stats.NodeGetFsmSiblings99,
"node_get_fsm_siblings_mean": stats.NodeGetFsmSiblingsMean,
"node_get_fsm_siblings_median": stats.NodeGetFsmSiblingsMedian,
"node_get_fsm_time_100": stats.NodeGetFsmTime100,
"node_get_fsm_time_95": stats.NodeGetFsmTime95,
"node_get_fsm_time_99": stats.NodeGetFsmTime99,
"node_get_fsm_time_mean": stats.NodeGetFsmTimeMean,
"node_get_fsm_time_median": stats.NodeGetFsmTimeMedian,
"node_gets": stats.NodeGets,
"node_gets_total": stats.NodeGetsTotal,
"node_put_fsm_time_100": stats.NodePutFsmTime100,
"node_put_fsm_time_95": stats.NodePutFsmTime95,
"node_put_fsm_time_99": stats.NodePutFsmTime99,
"node_put_fsm_time_mean": stats.NodePutFsmTimeMean,
"node_put_fsm_time_median": stats.NodePutFsmTimeMedian,
"node_puts": stats.NodePuts,
"node_puts_total": stats.NodePutsTotal,
"pbc_active": stats.PbcActive,
"pbc_connects": stats.PbcConnects,
"pbc_connects_total": stats.PbcConnectsTotal,
"vnode_gets": stats.VnodeGets,
"vnode_gets_total": stats.VnodeGetsTotal,
"vnode_index_reads": stats.VnodeIndexReads,
"vnode_index_reads_total": stats.VnodeIndexReadsTotal,
"vnode_index_writes": stats.VnodeIndexWrites,
"vnode_index_writes_total": stats.VnodeIndexWritesTotal,
"vnode_puts": stats.VnodePuts,
"vnode_puts_total": stats.VnodePutsTotal,
}

// Accumulate the tags and values
acc.AddFields("riak", fields, tags)

return nil
}

func init() {
inputs.Add("riak", func() telegraf.Input {
return NewRiak()
})
}
Loading