Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add RethinkDB plugin #47

Merged
merged 1 commit into from
Jul 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions plugins/rethinkdb/rethinkdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package rethinkdb

import (
"fmt"
"net/url"
"sync"

"github.com/influxdb/telegraf/plugins"

"gopkg.in/dancannon/gorethink.v1"
)

type RethinkDB struct {
Servers []string
}

var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie rethinkdb://user:auth_key@10.10.3.30:28105,
# rethinkdb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 28015 as the port.
servers = ["127.0.0.1:28015"]`

func (r *RethinkDB) SampleConfig() string {
return sampleConfig
}

func (r *RethinkDB) Description() string {
return "Read metrics from one or many RethinkDB servers"
}

var localhost = &Server{Url: &url.URL{Host: "127.0.0.1:28015"}}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *RethinkDB) Gather(acc plugins.Accumulator) error {
if len(r.Servers) == 0 {
r.gatherServer(localhost, acc)
return nil
}

var wg sync.WaitGroup

var outerr error

for _, serv := range r.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
} else if u.Scheme == "" {
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
}
wg.Add(1)
go func(serv string) {
defer wg.Done()
outerr = r.gatherServer(&Server{Url: u}, acc)
}(serv)
}

wg.Wait()

return outerr
}

func (r *RethinkDB) gatherServer(server *Server, acc plugins.Accumulator) error {
var err error
connectOpts := gorethink.ConnectOpts{
Address: server.Url.Host,
DiscoverHosts: false,
}
if server.Url.User != nil {
pwd, set := server.Url.User.Password()
if set && pwd != "" {
connectOpts.AuthKey = pwd
}
}
server.session, err = gorethink.Connect(connectOpts)
if err != nil {
return fmt.Errorf("Unable to connect to RethinkDB, %s\n", err.Error())
}
defer server.session.Close()

return server.gatherData(acc)
}

func init() {
plugins.Add("rethinkdb", func() plugins.Plugin {
return &RethinkDB{}
})
}
110 changes: 110 additions & 0 deletions plugins/rethinkdb/rethinkdb_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package rethinkdb

import (
"reflect"
"time"

"github.com/influxdb/telegraf/plugins"
)

type serverStatus struct {
Id string `gorethink:"id"`
Network struct {
Addresses []Address `gorethink:"canonical_addresses"`
Hostname string `gorethink:"hostname"`
DriverPort int `gorethink:"reql_port"`
} `gorethink:"network"`
Process struct {
Version string `gorethink:"version"`
RunningSince time.Time `gorethink:"time_started"`
} `gorethink:"process"`
}

type Address struct {
Host string `gorethink:"host"`
Port int `gorethink:"port"`
}

type stats struct {
Engine Engine `gorethink:"query_engine"`
}

type Engine struct {
ClientConns int64 `gorethink:"client_connections,omitempty"`
ClientActive int64 `gorethink:"clients_active,omitempty"`
QueriesPerSec int64 `gorethink:"queries_per_sec,omitempty"`
TotalQueries int64 `gorethink:"queries_total,omitempty"`
ReadsPerSec int64 `gorethink:"read_docs_per_sec,omitempty"`
TotalReads int64 `gorethink:"read_docs_total,omitempty"`
WritesPerSec int64 `gorethink:"written_docs_per_sec,omitempty"`
TotalWrites int64 `gorethink:"written_docs_total,omitempty"`
}

type tableStatus struct {
Id string `gorethink:"id"`
DB string `gorethink:"db"`
Name string `gorethink:"name"`
}

type tableStats struct {
Engine Engine `gorethink:"query_engine"`
Storage Storage `gorethink:"storage_engine"`
}

type Storage struct {
Cache Cache `gorethink:"cache"`
Disk Disk `gorethink:"disk"`
}

type Cache struct {
BytesInUse int64 `gorethink:"in_use_bytes"`
}

type Disk struct {
ReadBytesPerSec int64 `gorethink:"read_bytes_per_sec"`
ReadBytesTotal int64 `gorethink:"read_bytes_total"`
WriteBytesPerSec int64 `gorethik:"written_bytes_per_sec"`
WriteBytesTotal int64 `gorethink:"written_bytes_total"`
SpaceUsage SpaceUsage `gorethink:"space_usage"`
}

type SpaceUsage struct {
Data int64 `gorethink:"data_bytes"`
Garbage int64 `gorethink:"garbage_bytes"`
Metadata int64 `gorethink:"metadata_bytes"`
Prealloc int64 `gorethink:"preallocated_bytes"`
}

var engineStats = map[string]string{
"active_clients": "ClientActive",
"clients": "ClientConns",
"queries_per_sec": "QueriesPerSec",
"total_queries": "TotalQueries",
"read_docs_per_sec": "ReadsPerSec",
"total_reads": "TotalReads",
"written_docs_per_sec": "WritesPerSec",
"total_writes": "TotalWrites",
}

func (e *Engine) AddEngineStats(keys []string, acc plugins.Accumulator, tags map[string]string) {
engine := reflect.ValueOf(e).Elem()
for _, key := range keys {
acc.Add(
key,
engine.FieldByName(engineStats[key]).Interface(),
tags,
)
}
}

func (s *Storage) AddStats(acc plugins.Accumulator, tags map[string]string) {
acc.Add("cache_bytes_in_use", s.Cache.BytesInUse, tags)
acc.Add("disk_read_bytes_per_sec", s.Disk.ReadBytesPerSec, tags)
acc.Add("disk_read_bytes_total", s.Disk.ReadBytesTotal, tags)
acc.Add("disk_written_bytes_per_sec", s.Disk.WriteBytesPerSec, tags)
acc.Add("disk_written_bytes_total", s.Disk.WriteBytesTotal, tags)
acc.Add("disk_usage_data_bytes", s.Disk.SpaceUsage.Data, tags)
acc.Add("disk_usage_garbage_bytes", s.Disk.SpaceUsage.Garbage, tags)
acc.Add("disk_usage_metadata_bytes", s.Disk.SpaceUsage.Metadata, tags)
acc.Add("disk_usage_preallocated_bytes", s.Disk.SpaceUsage.Prealloc, tags)
}
112 changes: 112 additions & 0 deletions plugins/rethinkdb/rethinkdb_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package rethinkdb

import (
"testing"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
)

var tags = make(map[string]string)

func TestAddEngineStats(t *testing.T) {
engine := &Engine{
ClientConns: 0,
ClientActive: 0,
QueriesPerSec: 0,
TotalQueries: 0,
ReadsPerSec: 0,
TotalReads: 0,
WritesPerSec: 0,
TotalWrites: 0,
}

var acc testutil.Accumulator

keys := []string{
"active_clients",
"clients",
"queries_per_sec",
"total_queries",
"read_docs_per_sec",
"total_reads",
"written_docs_per_sec",
"total_writes",
}
engine.AddEngineStats(keys, &acc, tags)

for _, metric := range keys {
assert.True(t, acc.HasIntValue(metric))
}
}

func TestAddEngineStatsPartial(t *testing.T) {
engine := &Engine{
ClientConns: 0,
ClientActive: 0,
QueriesPerSec: 0,
ReadsPerSec: 0,
WritesPerSec: 0,
}

var acc testutil.Accumulator

keys := []string{
"active_clients",
"clients",
"queries_per_sec",
"read_docs_per_sec",
"written_docs_per_sec",
}

missing_keys := []string{
"total_queries",
"total_reads",
"total_writes",
}
engine.AddEngineStats(keys, &acc, tags)

for _, metric := range missing_keys {
assert.False(t, acc.HasIntValue(metric))
}
}

func TestAddStorageStats(t *testing.T) {
storage := &Storage{
Cache: Cache{
BytesInUse: 0,
},
Disk: Disk{
ReadBytesPerSec: 0,
ReadBytesTotal: 0,
WriteBytesPerSec: 0,
WriteBytesTotal: 0,
SpaceUsage: SpaceUsage{
Data: 0,
Garbage: 0,
Metadata: 0,
Prealloc: 0,
},
},
}

var acc testutil.Accumulator

keys := []string{
"cache_bytes_in_use",
"disk_read_bytes_per_sec",
"disk_read_bytes_total",
"disk_written_bytes_per_sec",
"disk_written_bytes_total",
"disk_usage_data_bytes",
"disk_usage_garbage_bytes",
"disk_usage_metadata_bytes",
"disk_usage_preallocated_bytes",
}

storage.AddStats(&acc, tags)

for _, metric := range keys {
assert.True(t, acc.HasIntValue(metric))
}
}
Loading