Skip to content

Commit

Permalink
Add input plugin for McRouter (influxdata#4077)
Browse files Browse the repository at this point in the history
  • Loading branch information
cthayer authored and otherpirate committed Mar 15, 2019
1 parent 0c0e5fd commit d20e253
Show file tree
Hide file tree
Showing 4 changed files with 640 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
_ "github.com/influxdata/telegraf/plugins/inputs/mcrouter"
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mesos"
_ "github.com/influxdata/telegraf/plugins/inputs/minecraft"
Expand Down
103 changes: 103 additions & 0 deletions plugins/inputs/mcrouter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Mcrouter Input Plugin

This plugin gathers statistics data from a Mcrouter server.

### Configuration:

```toml
# Read metrics from one or many mcrouter servers.
[[inputs.mcrouter]]
## An array of address to gather stats about. Specify an ip or hostname
## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc.
servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"]

## Timeout for metric collections from all servers. Minimum timeout is "1s".
# timeout = "5s"
```

### Measurements & Fields:

The fields from this plugin are gathered in the *mcrouter* measurement.

Description of gathered fields can be found [here](https://github.com/facebook/mcrouter/wiki/Stats-list).

Fields:

* uptime
* num_servers
* num_servers_new
* num_servers_up
* num_servers_down
* num_servers_closed
* num_clients
* num_suspect_servers
* destination_batches_sum
* destination_requests_sum
* outstanding_route_get_reqs_queued
* outstanding_route_update_reqs_queued
* outstanding_route_get_avg_queue_size
* outstanding_route_update_avg_queue_size
* outstanding_route_get_avg_wait_time_sec
* outstanding_route_update_avg_wait_time_sec
* retrans_closed_connections
* destination_pending_reqs
* destination_inflight_reqs
* destination_batch_size
* asynclog_requests
* proxy_reqs_processing
* proxy_reqs_waiting
* client_queue_notify_period
* rusage_system
* rusage_user
* ps_num_minor_faults
* ps_num_major_faults
* ps_user_time_sec
* ps_system_time_sec
* ps_vsize
* ps_rss
* fibers_allocated
* fibers_pool_size
* fibers_stack_high_watermark
* successful_client_connections
* duration_us
* destination_max_pending_reqs
* destination_max_inflight_reqs
* retrans_per_kbyte_max
* cmd_get_count
* cmd_delete_out
* cmd_lease_get
* cmd_set
* cmd_get_out_all
* cmd_get_out
* cmd_lease_set_count
* cmd_other_out_all
* cmd_lease_get_out
* cmd_set_count
* cmd_lease_set_out
* cmd_delete_count
* cmd_other
* cmd_delete
* cmd_get
* cmd_lease_set
* cmd_set_out
* cmd_lease_get_count
* cmd_other_out
* cmd_lease_get_out_all
* cmd_set_out_all
* cmd_other_count
* cmd_delete_out_all
* cmd_lease_set_out_all

### Tags:

* Mcrouter measurements have the following tags:
- server (the host name from which metrics are gathered)



### Example Output:

```
$ ./telegraf --config telegraf.conf --input-filter mcrouter --test
mcrouter,server=localhost:11211 uptime=166,num_servers=1,num_servers_new=1,num_servers_up=0,num_servers_down=0,num_servers_closed=0,num_clients=1,num_suspect_servers=0,destination_batches_sum=0,destination_requests_sum=0,outstanding_route_get_reqs_queued=0,outstanding_route_update_reqs_queued=0,outstanding_route_get_avg_queue_size=0,outstanding_route_update_avg_queue_size=0,outstanding_route_get_avg_wait_time_sec=0,outstanding_route_update_avg_wait_time_sec=0,retrans_closed_connections=0,destination_pending_reqs=0,destination_inflight_reqs=0,destination_batch_size=0,asynclog_requests=0,proxy_reqs_processing=1,proxy_reqs_waiting=0,client_queue_notify_period=0,rusage_system=0.040966,rusage_user=0.020483,ps_num_minor_faults=2490,ps_num_major_faults=11,ps_user_time_sec=0.02,ps_system_time_sec=0.04,ps_vsize=697741312,ps_rss=10563584,fibers_allocated=0,fibers_pool_size=0,fibers_stack_high_watermark=0,successful_client_connections=18,duration_us=0,destination_max_pending_reqs=0,destination_max_inflight_reqs=0,retrans_per_kbyte_max=0,cmd_get_count=0,cmd_delete_out=0,cmd_lease_get=0,cmd_set=0,cmd_get_out_all=0,cmd_get_out=0,cmd_lease_set_count=0,cmd_other_out_all=0,cmd_lease_get_out=0,cmd_set_count=0,cmd_lease_set_out=0,cmd_delete_count=0,cmd_other=0,cmd_delete=0,cmd_get=0,cmd_lease_set=0,cmd_set_out=0,cmd_lease_get_count=0,cmd_other_out=0,cmd_lease_get_out_all=0,cmd_set_out_all=0,cmd_other_count=0,cmd_delete_out_all=0,cmd_lease_set_out_all=0 1453831884664956455
```
286 changes: 286 additions & 0 deletions plugins/inputs/mcrouter/mcrouter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package mcrouter

import (
"bufio"
"context"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

// Mcrouter is a mcrouter plugin
type Mcrouter struct {
Servers []string
Timeout internal.Duration
}

// enum for statType
type statType int

const (
typeInt statType = iota
typeFloat statType = iota
)

var sampleConfig = `
## An array of address to gather stats about. Specify an ip or hostname
## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc.
servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"]
## Timeout for metric collections from all servers. Minimum timeout is "1s".
# timeout = "5s"
`

var defaultTimeout = 5 * time.Second

var defaultServerURL = url.URL{
Scheme: "tcp",
Host: "localhost:11211",
}

// The list of metrics that should be sent
var sendMetrics = map[string]statType{
"uptime": typeInt,
"num_servers": typeInt,
"num_servers_new": typeInt,
"num_servers_up": typeInt,
"num_servers_down": typeInt,
"num_servers_closed": typeInt,
"num_clients": typeInt,
"num_suspect_servers": typeInt,
"destination_batches_sum": typeInt,
"destination_requests_sum": typeInt,
"outstanding_route_get_reqs_queued": typeInt,
"outstanding_route_update_reqs_queued": typeInt,
"outstanding_route_get_avg_queue_size": typeInt,
"outstanding_route_update_avg_queue_size": typeInt,
"outstanding_route_get_avg_wait_time_sec": typeInt,
"outstanding_route_update_avg_wait_time_sec": typeInt,
"retrans_closed_connections": typeInt,
"destination_pending_reqs": typeInt,
"destination_inflight_reqs": typeInt,
"destination_batch_size": typeInt,
"asynclog_requests": typeInt,
"proxy_reqs_processing": typeInt,
"proxy_reqs_waiting": typeInt,
"client_queue_notify_period": typeInt,
"rusage_system": typeFloat,
"rusage_user": typeFloat,
"ps_num_minor_faults": typeInt,
"ps_num_major_faults": typeInt,
"ps_user_time_sec": typeFloat,
"ps_system_time_sec": typeFloat,
"ps_vsize": typeInt,
"ps_rss": typeInt,
"fibers_allocated": typeInt,
"fibers_pool_size": typeInt,
"fibers_stack_high_watermark": typeInt,
"successful_client_connections": typeInt,
"duration_us": typeInt,
"destination_max_pending_reqs": typeInt,
"destination_max_inflight_reqs": typeInt,
"retrans_per_kbyte_max": typeInt,
"cmd_get_count": typeInt,
"cmd_delete_out": typeInt,
"cmd_lease_get": typeInt,
"cmd_set": typeInt,
"cmd_get_out_all": typeInt,
"cmd_get_out": typeInt,
"cmd_lease_set_count": typeInt,
"cmd_other_out_all": typeInt,
"cmd_lease_get_out": typeInt,
"cmd_set_count": typeInt,
"cmd_lease_set_out": typeInt,
"cmd_delete_count": typeInt,
"cmd_other": typeInt,
"cmd_delete": typeInt,
"cmd_get": typeInt,
"cmd_lease_set": typeInt,
"cmd_set_out": typeInt,
"cmd_lease_get_count": typeInt,
"cmd_other_out": typeInt,
"cmd_lease_get_out_all": typeInt,
"cmd_set_out_all": typeInt,
"cmd_other_count": typeInt,
"cmd_delete_out_all": typeInt,
"cmd_lease_set_out_all": typeInt,
}

// SampleConfig returns sample configuration message
func (m *Mcrouter) SampleConfig() string {
return sampleConfig
}

// Description returns description of Mcrouter plugin
func (m *Mcrouter) Description() string {
return "Read metrics from one or many mcrouter servers"
}

// Gather reads stats from all configured servers accumulates stats
func (m *Mcrouter) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()

if m.Timeout.Duration < 1*time.Second {
m.Timeout.Duration = defaultTimeout
}

ctx, cancel := context.WithTimeout(ctx, m.Timeout.Duration)
defer cancel()

if len(m.Servers) == 0 {
m.Servers = []string{defaultServerURL.String()}
}

for _, serverAddress := range m.Servers {
acc.AddError(m.gatherServer(ctx, serverAddress, acc))
}

return nil
}

// ParseAddress parses an address string into 'host:port' and 'protocol' parts
func (m *Mcrouter) ParseAddress(address string) (string, string, error) {
var protocol string
var host string
var port string

u, parseError := url.Parse(address)

if parseError != nil {
return "", "", fmt.Errorf("Invalid server address")
}

if u.Scheme != "tcp" && u.Scheme != "unix" {
return "", "", fmt.Errorf("Invalid server protocol")
}

protocol = u.Scheme

if protocol == "unix" {
if u.Path == "" {
return "", "", fmt.Errorf("Invalid unix socket path")
}

address = u.Path
} else {
if u.Host == "" {
return "", "", fmt.Errorf("Invalid host")
}

host = u.Hostname()
port = u.Port()

if host == "" {
host = defaultServerURL.Hostname()
}

if port == "" {
port = defaultServerURL.Port()
}

address = host + ":" + port
}

return address, protocol, nil
}

func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegraf.Accumulator) error {
var conn net.Conn
var err error
var protocol string
var dialer net.Dialer

address, protocol, err = m.ParseAddress(address)

conn, err = dialer.DialContext(ctx, protocol, address)

if err != nil {
return err
}

defer conn.Close()

// Extend connection
deadline, ok := ctx.Deadline()

if ok {
conn.SetDeadline(deadline)
}

// Read and write buffer
reader := bufio.NewReader(conn)
scanner := bufio.NewScanner(reader)

// Send command
if _, err := fmt.Fprint(conn, "stats\r\n"); err != nil {
return err
}

values, err := parseResponse(scanner)

if err != nil {
return err
}

// Add server address as a tag
tags := map[string]string{"server": address}

// Process values
fields := make(map[string]interface{})
for key, sType := range sendMetrics {
if value, ok := values[key]; ok {
switch sType {
case typeInt:
if v, errParse := strconv.ParseInt(value, 10, 64); errParse == nil {
fields[key] = v
}
case typeFloat:
if v, errParse := strconv.ParseFloat(value, 64); errParse == nil {
fields[key] = v
}
default:
}
}
}
acc.AddFields("mcrouter", fields, tags)
return nil
}

func parseResponse(r *bufio.Scanner) (map[string]string, error) {
values := make(map[string]string)

for r.Scan() {
// Read line
line := r.Text()

// Done
if line == "END" {
break
}

// Read values
s := strings.SplitN(line, " ", 3)

if len(s) != 3 || s[0] != "STAT" {
return nil, fmt.Errorf("unexpected line in stats response: %s", line)
}

// Save values
values[s[1]] = s[2]
}

return values, nil
}

func init() {
inputs.Add("mcrouter", func() telegraf.Input {
return &Mcrouter{}
})
}
Loading

0 comments on commit d20e253

Please sign in to comment.