Skip to content

Commit

Permalink
Don't parse framework and allocator metric names
Browse files Browse the repository at this point in the history
This restores framework and allocator metrics to their original names,
without extracting parts into tags.
  • Loading branch information
branden committed Aug 2, 2019
1 parent 697073e commit effb993
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 418 deletions.
188 changes: 4 additions & 184 deletions plugins/inputs/mesos/mesos.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package mesos
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
Expand Down Expand Up @@ -42,78 +41,6 @@ type Mesos struct {
slaveURLs []*url.URL
}

// TaggedField allows us to create a predictable hash from each unique
// combination of tags
type TaggedField struct {
FrameworkName string
CallType string
EventType string
OperationType string
TaskState string
RoleName string
FieldName string
Resource string
Value interface{}
}

func (tf TaggedField) hash() string {
buffer := "tf"

if tf.FrameworkName != "" {
buffer += "_fn:" + tf.FrameworkName
}
if tf.CallType != "" {
buffer += "_ct:" + tf.CallType
}
if tf.EventType != "" {
buffer += "_et:" + tf.EventType
}
if tf.OperationType != "" {
buffer += "_ot:" + tf.OperationType
}
if tf.TaskState != "" {
buffer += "_ts:" + tf.TaskState
}
if tf.RoleName != "" {
buffer += "_rn:" + tf.RoleName
}
if tf.Resource != "" {
buffer += "_r:" + tf.Resource
}

return buffer
}

type fieldTags map[string]string

func (tf TaggedField) tags() fieldTags {
tags := fieldTags{}

if tf.FrameworkName != "" {
tags["framework_name"] = tf.FrameworkName
}
if tf.CallType != "" {
tags["call_type"] = tf.CallType
}
if tf.EventType != "" {
tags["event_type"] = tf.EventType
}
if tf.OperationType != "" {
tags["operation_type"] = tf.OperationType
}
if tf.TaskState != "" {
tags["task_state"] = tf.TaskState
}
if tf.RoleName != "" {
tags["role_name"] = tf.RoleName
}
if tf.Resource != "" {
tags["resource"] = tf.Resource
}

return tags
}

var allMetrics = map[Role][]string{
MASTER: {"resources", "master", "system", "agents", "frameworks", "framework_offers", "tasks", "messages", "evqueue", "registrar", "allocator"},
SLAVE: {"resources", "agent", "system", "executors", "tasks", "messages"},
Expand Down Expand Up @@ -703,132 +630,25 @@ func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulato
}
}

taggedFields := map[string][]TaggedField{}
extraTags := map[string]fieldTags{}

for metricName, val := range jf.Fields {
if !strings.HasPrefix(metricName, "master/frameworks/") && !strings.HasPrefix(metricName, "allocator/") {
for metricName := range jf.Fields {
if !strings.HasPrefix(metricName, "master/frameworks/") && !strings.HasPrefix(metricName, "frameworks/") && !strings.HasPrefix(metricName, "allocator/") {
continue
}

// filter out framework offers/allocator metrics if necessary
if (!includeFrameworkOffers && strings.HasPrefix(metricName, "master/frameworks/")) ||
if !includeFrameworkOffers &&
(strings.HasPrefix(metricName, "master/frameworks/") || strings.HasPrefix(metricName, "frameworks/")) ||
(!includeAllocator && strings.HasPrefix(metricName, "allocator/")) {
delete(jf.Fields, metricName)
continue
}

parts := strings.Split(metricName, "/")
if (parts[0] == "master" && len(parts) < 5) || (parts[0] == "allocator" && len(parts) <= 5) {
// All framework offers metrics have at least 5 parts.
// All allocator metrics with <= 5 parts can be sent as is and does not pull
// any params out into tags.
// (e.g. allocator/mesos/allocation_run_ms/count vs allocator/mesos/roles/<role>/shares/dominant)
continue
}

tf := generateTaggedField(parts)
tf.Value = val

if len(tf.tags()) == 0 {
// indicates no extra tags were added
continue
}

tfh := tf.hash()
if _, ok := taggedFields[tfh]; !ok {
taggedFields[tfh] = []TaggedField{}
}
taggedFields[tfh] = append(taggedFields[tfh], tf)

if _, ok := extraTags[tfh]; !ok {
extraTags[tfh] = tf.tags()
}

delete(jf.Fields, metricName)
}

acc.AddFields("mesos", jf.Fields, tags)

for tfh, tfs := range taggedFields {
fields := map[string]interface{}{}
for _, tf := range tfs {
fields[tf.FieldName] = tf.Value
}
for k, v := range tags {
extraTags[tfh][k] = v
}

acc.AddFields("mesos", fields, extraTags[tfh])
}

return nil
}

func generateTaggedField(parts []string) TaggedField {
tf := TaggedField{}

if parts[0] == "master" {
tf.FrameworkName = parts[2]
if len(parts) == 5 {
// e.g. /master/frameworks/calls_total
tf.FieldName = fmt.Sprintf("%s/%s/%s_total", parts[0], parts[1], parts[4])
} else {
switch parts[4] {
case "offers":
// e.g. /master/frameworks/offers/sent
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[5])
case "calls":
// e.g. /master/frameworks/calls/decline
tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4])
tf.CallType = parts[5]
case "events":
// e.g. /master/frameworks/events/heartbeat
tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4])
tf.EventType = parts[5]
case "operations":
// e.g. /master/frameworks/operations/create
tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4])
tf.OperationType = parts[5]
case "tasks":
// e.g. /master/frameworks/tasks/active/running
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[5])
tf.TaskState = parts[6]
case "roles":
// e.g. /master/frameworks/roles/public
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[6])
tf.RoleName = parts[5]
default:
// default to excluding framework name and id, but otherwise leaving path as is
log.Printf("I! Unexpected metric name %s", parts[4])
tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], strings.Join(parts[4:], "/"))
}
}
} else if parts[0] == "allocator" {
switch parts[2] {
case "roles":
// e.g. /allocator/roles/shares/dominant
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[2], parts[4], parts[5])
tf.RoleName = parts[3]
case "offer_filters":
// e.g. /allocator/offer_filters/roles/active
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[2], parts[3], parts[5])
tf.RoleName = parts[4]
case "quota":
// e.g. /allocator/quota/roles/resources/offered_or_allocated
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s/%s", parts[0], parts[2], parts[3], parts[5], parts[7])
tf.RoleName = parts[4]
tf.Resource = parts[6]
default:
// default to leaving path as is
log.Printf("I! Unexpected metric name %s", parts[2])
tf.FieldName = strings.Join(parts, "/")
}
}

return tf
}

func init() {
inputs.Add("mesos", func() telegraf.Input {
return &Mesos{}
Expand Down
Loading

0 comments on commit effb993

Please sign in to comment.