-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mesos input: Collect framework_offers and allocator metrics #5719
Changes from 1 commit
e0a1d51
697073e
effb993
1ee1789
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package mesos | |
import ( | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io/ioutil" | ||
"log" | ||
"net" | ||
|
@@ -41,8 +42,80 @@ type Mesos struct { | |
slaveURLs []*url.URL | ||
} | ||
|
||
// TaggedField allows us to create a predictable hash from each unique | ||
// combination of tags | ||
type TaggedField struct { | ||
FrameworkName string | ||
CallType string | ||
EventType string | ||
OperationType string | ||
TaskState string | ||
RoleName string | ||
FieldName string | ||
Resource string | ||
Value interface{} | ||
} | ||
|
||
func (tf TaggedField) hash() string { | ||
buffer := "tf" | ||
|
||
if tf.FrameworkName != "" { | ||
buffer += "_fn:" + tf.FrameworkName | ||
} | ||
if tf.CallType != "" { | ||
buffer += "_ct:" + tf.CallType | ||
} | ||
if tf.EventType != "" { | ||
buffer += "_et:" + tf.EventType | ||
} | ||
if tf.OperationType != "" { | ||
buffer += "_ot:" + tf.OperationType | ||
} | ||
if tf.TaskState != "" { | ||
buffer += "_ts:" + tf.TaskState | ||
} | ||
if tf.RoleName != "" { | ||
buffer += "_rn:" + tf.RoleName | ||
} | ||
if tf.Resource != "" { | ||
buffer += "_r:" + tf.Resource | ||
} | ||
|
||
return buffer | ||
} | ||
|
||
type fieldTags map[string]string | ||
|
||
func (tf TaggedField) tags() fieldTags { | ||
tags := fieldTags{} | ||
|
||
if tf.FrameworkName != "" { | ||
tags["framework_name"] = tf.FrameworkName | ||
} | ||
if tf.CallType != "" { | ||
tags["call_type"] = tf.CallType | ||
} | ||
if tf.EventType != "" { | ||
tags["event_type"] = tf.EventType | ||
} | ||
if tf.OperationType != "" { | ||
tags["operation_type"] = tf.OperationType | ||
} | ||
if tf.TaskState != "" { | ||
tags["task_state"] = tf.TaskState | ||
} | ||
if tf.RoleName != "" { | ||
tags["role_name"] = tf.RoleName | ||
} | ||
if tf.Resource != "" { | ||
tags["resource"] = tf.Resource | ||
} | ||
|
||
return tags | ||
} | ||
|
||
var allMetrics = map[Role][]string{ | ||
MASTER: {"resources", "master", "system", "agents", "frameworks", "tasks", "messages", "evqueue", "registrar"}, | ||
MASTER: {"resources", "master", "system", "agents", "frameworks", "framework_offers", "tasks", "messages", "evqueue", "registrar", "allocator"}, | ||
SLAVE: {"resources", "agent", "system", "executors", "tasks", "messages"}, | ||
} | ||
|
||
|
@@ -58,10 +131,12 @@ var sampleConfig = ` | |
"system", | ||
"agents", | ||
"frameworks", | ||
"framework_offers", | ||
"tasks", | ||
"messages", | ||
"evqueue", | ||
"registrar", | ||
"allocator", | ||
] | ||
## A list of Mesos slaves, default is [] | ||
# slaves = [] | ||
|
@@ -315,6 +390,12 @@ func getMetrics(role Role, group string) []string { | |
"master/outstanding_offers", | ||
} | ||
|
||
// These groups are empty because filtering is done in gatherMainMetrics | ||
// based on presence of "framework_offers"/"allocator" in MasterCols. | ||
// These lines are included to prevent the "unknown" info log below. | ||
m["framework_offers"] = []string{} | ||
m["allocator"] = []string{} | ||
|
||
m["tasks"] = []string{ | ||
"master/tasks_error", | ||
"master/tasks_failed", | ||
|
@@ -593,11 +674,141 @@ func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulato | |
} | ||
} | ||
|
||
var includeFrameworkOffers, includeAllocator bool | ||
for _, col := range m.MasterCols { | ||
if col == "framework_offers" { | ||
includeFrameworkOffers = true | ||
} else if col == "allocator" { | ||
includeAllocator = true | ||
} | ||
} | ||
|
||
taggedFields := map[string][]TaggedField{} | ||
extraTags := map[string]fieldTags{} | ||
|
||
for metricName, val := range jf.Fields { | ||
if !strings.HasPrefix(metricName, "master/frameworks/") && !strings.HasPrefix(metricName, "allocator/") { | ||
continue | ||
} | ||
|
||
// filter out framework offers/allocator metrics if necessary | ||
if (!includeFrameworkOffers && strings.HasPrefix(metricName, "master/frameworks/")) || | ||
(!includeAllocator && strings.HasPrefix(metricName, "allocator/")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic ended up a bit too complex for my taste, can you move the filtering into
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pushed 1ee1789. This required a refactor to the test module: I had to move |
||
delete(jf.Fields, metricName) | ||
continue | ||
} | ||
|
||
parts := strings.Split(metricName, "/") | ||
if (parts[0] == "master" && len(parts) < 5) || (parts[0] == "allocator" && len(parts) <= 5) { | ||
// All framework offers metrics have at least 5 parts. | ||
// All allocator metrics with <= 5 parts can be sent as is and does not pull | ||
// any params out into tags. | ||
// (e.g. allocator/mesos/allocation_run_ms/count vs allocator/mesos/roles/<role>/shares/dominant) | ||
continue | ||
} | ||
|
||
tf := generateTaggedField(parts) | ||
tf.Value = val | ||
|
||
if len(tf.tags()) == 0 { | ||
// indicates no extra tags were added | ||
continue | ||
} | ||
|
||
tfh := tf.hash() | ||
if _, ok := taggedFields[tfh]; !ok { | ||
taggedFields[tfh] = []TaggedField{} | ||
} | ||
taggedFields[tfh] = append(taggedFields[tfh], tf) | ||
|
||
if _, ok := extraTags[tfh]; !ok { | ||
extraTags[tfh] = tf.tags() | ||
} | ||
|
||
delete(jf.Fields, metricName) | ||
} | ||
|
||
acc.AddFields("mesos", jf.Fields, tags) | ||
|
||
for tfh, tfs := range taggedFields { | ||
fields := map[string]interface{}{} | ||
for _, tf := range tfs { | ||
fields[tf.FieldName] = tf.Value | ||
} | ||
for k, v := range tags { | ||
extraTags[tfh][k] = v | ||
} | ||
|
||
acc.AddFields("mesos", fields, extraTags[tfh]) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func generateTaggedField(parts []string) TaggedField { | ||
tf := TaggedField{} | ||
|
||
if parts[0] == "master" { | ||
tf.FrameworkName = parts[2] | ||
if len(parts) == 5 { | ||
// e.g. /master/frameworks/calls_total | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s_total", parts[0], parts[1], parts[4]) | ||
} else { | ||
switch parts[4] { | ||
case "offers": | ||
// e.g. /master/frameworks/offers/sent | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[5]) | ||
case "calls": | ||
// e.g. /master/frameworks/calls/decline | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4]) | ||
tf.CallType = parts[5] | ||
case "events": | ||
// e.g. /master/frameworks/events/heartbeat | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4]) | ||
tf.EventType = parts[5] | ||
case "operations": | ||
// e.g. /master/frameworks/operations/create | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4]) | ||
tf.OperationType = parts[5] | ||
case "tasks": | ||
// e.g. /master/frameworks/tasks/active/running | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[5]) | ||
tf.TaskState = parts[6] | ||
case "roles": | ||
// e.g. /master/frameworks/roles/public | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[6]) | ||
tf.RoleName = parts[5] | ||
default: | ||
// default to excluding framework name and id, but otherwise leaving path as is | ||
log.Printf("I! Unexpected metric name %s", parts[4]) | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], strings.Join(parts[4:], "/")) | ||
} | ||
} | ||
} else if parts[0] == "allocator" { | ||
switch parts[2] { | ||
case "roles": | ||
// e.g. /allocator/roles/shares/dominant | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[2], parts[4], parts[5]) | ||
tf.RoleName = parts[3] | ||
case "offer_filters": | ||
// e.g. /allocator/offer_filters/roles/active | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[2], parts[3], parts[5]) | ||
tf.RoleName = parts[4] | ||
case "quota": | ||
// e.g. /allocator/quota/roles/resources/offered_or_allocated | ||
tf.FieldName = fmt.Sprintf("%s/%s/%s/%s/%s", parts[0], parts[2], parts[3], parts[5], parts[7]) | ||
tf.RoleName = parts[4] | ||
tf.Resource = parts[6] | ||
default: | ||
// default to leaving path as is | ||
log.Printf("I! Unexpected metric name %s", parts[2]) | ||
tf.FieldName = strings.Join(parts, "/") | ||
} | ||
} | ||
|
||
return tf | ||
} | ||
|
||
func init() { | ||
inputs.Add("mesos", func() telegraf.Input { | ||
return &Mesos{} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we don't list the metrics here and remove the logic in gatherMainMetrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allocator and framework metric names may include values that can't be predicted, such as framework ID and role (docs here and here). That prevents us from listing all the possible allocator and framework metrics here.