From effb993e04d36aecdeaafbbc217134de11d4738c Mon Sep 17 00:00:00 2001 From: Branden Rolston Date: Thu, 1 Aug 2019 22:22:49 -0700 Subject: [PATCH] Don't parse framework and allocator metric names This restores framework and allocator metrics to their original names, without extracting parts into tags. --- plugins/inputs/mesos/mesos.go | 188 +---------------------- plugins/inputs/mesos/mesos_test.go | 235 +---------------------------- 2 files changed, 5 insertions(+), 418 deletions(-) diff --git a/plugins/inputs/mesos/mesos.go b/plugins/inputs/mesos/mesos.go index 2c4cb4028077a..44d27dc4921d1 100644 --- a/plugins/inputs/mesos/mesos.go +++ b/plugins/inputs/mesos/mesos.go @@ -3,7 +3,6 @@ package mesos import ( "encoding/json" "errors" - "fmt" "io/ioutil" "log" "net" @@ -42,78 +41,6 @@ type Mesos struct { slaveURLs []*url.URL } -// TaggedField allows us to create a predictable hash from each unique -// combination of tags -type TaggedField struct { - FrameworkName string - CallType string - EventType string - OperationType string - TaskState string - RoleName string - FieldName string - Resource string - Value interface{} -} - -func (tf TaggedField) hash() string { - buffer := "tf" - - if tf.FrameworkName != "" { - buffer += "_fn:" + tf.FrameworkName - } - if tf.CallType != "" { - buffer += "_ct:" + tf.CallType - } - if tf.EventType != "" { - buffer += "_et:" + tf.EventType - } - if tf.OperationType != "" { - buffer += "_ot:" + tf.OperationType - } - if tf.TaskState != "" { - buffer += "_ts:" + tf.TaskState - } - if tf.RoleName != "" { - buffer += "_rn:" + tf.RoleName - } - if tf.Resource != "" { - buffer += "_r:" + tf.Resource - } - - return buffer -} - -type fieldTags map[string]string - -func (tf TaggedField) tags() fieldTags { - tags := fieldTags{} - - if tf.FrameworkName != "" { - tags["framework_name"] = tf.FrameworkName - } - if tf.CallType != "" { - tags["call_type"] = tf.CallType - } - if tf.EventType != "" { - tags["event_type"] = tf.EventType - } - if tf.OperationType != "" { - tags["operation_type"] = tf.OperationType - } - if tf.TaskState != "" { - tags["task_state"] = tf.TaskState - } - if tf.RoleName != "" { - tags["role_name"] = tf.RoleName - } - if tf.Resource != "" { - tags["resource"] = tf.Resource - } - - return tags -} - var allMetrics = map[Role][]string{ MASTER: {"resources", "master", "system", "agents", "frameworks", "framework_offers", "tasks", "messages", "evqueue", "registrar", "allocator"}, SLAVE: {"resources", "agent", "system", "executors", "tasks", "messages"}, @@ -703,132 +630,25 @@ func (m *Mesos) gatherMainMetrics(u *url.URL, role Role, acc telegraf.Accumulato } } - taggedFields := map[string][]TaggedField{} - extraTags := map[string]fieldTags{} - - for metricName, val := range jf.Fields { - if !strings.HasPrefix(metricName, "master/frameworks/") && !strings.HasPrefix(metricName, "allocator/") { + for metricName := range jf.Fields { + if !strings.HasPrefix(metricName, "master/frameworks/") && !strings.HasPrefix(metricName, "frameworks/") && !strings.HasPrefix(metricName, "allocator/") { continue } // filter out framework offers/allocator metrics if necessary - if (!includeFrameworkOffers && strings.HasPrefix(metricName, "master/frameworks/")) || + if !includeFrameworkOffers && + (strings.HasPrefix(metricName, "master/frameworks/") || strings.HasPrefix(metricName, "frameworks/")) || (!includeAllocator && strings.HasPrefix(metricName, "allocator/")) { delete(jf.Fields, metricName) continue } - - parts := strings.Split(metricName, "/") - if (parts[0] == "master" && len(parts) < 5) || (parts[0] == "allocator" && len(parts) <= 5) { - // All framework offers metrics have at least 5 parts. - // All allocator metrics with <= 5 parts can be sent as is and does not pull - // any params out into tags. - // (e.g. allocator/mesos/allocation_run_ms/count vs allocator/mesos/roles//shares/dominant) - continue - } - - tf := generateTaggedField(parts) - tf.Value = val - - if len(tf.tags()) == 0 { - // indicates no extra tags were added - continue - } - - tfh := tf.hash() - if _, ok := taggedFields[tfh]; !ok { - taggedFields[tfh] = []TaggedField{} - } - taggedFields[tfh] = append(taggedFields[tfh], tf) - - if _, ok := extraTags[tfh]; !ok { - extraTags[tfh] = tf.tags() - } - - delete(jf.Fields, metricName) } acc.AddFields("mesos", jf.Fields, tags) - for tfh, tfs := range taggedFields { - fields := map[string]interface{}{} - for _, tf := range tfs { - fields[tf.FieldName] = tf.Value - } - for k, v := range tags { - extraTags[tfh][k] = v - } - - acc.AddFields("mesos", fields, extraTags[tfh]) - } - return nil } -func generateTaggedField(parts []string) TaggedField { - tf := TaggedField{} - - if parts[0] == "master" { - tf.FrameworkName = parts[2] - if len(parts) == 5 { - // e.g. /master/frameworks/calls_total - tf.FieldName = fmt.Sprintf("%s/%s/%s_total", parts[0], parts[1], parts[4]) - } else { - switch parts[4] { - case "offers": - // e.g. /master/frameworks/offers/sent - tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[5]) - case "calls": - // e.g. /master/frameworks/calls/decline - tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4]) - tf.CallType = parts[5] - case "events": - // e.g. /master/frameworks/events/heartbeat - tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4]) - tf.EventType = parts[5] - case "operations": - // e.g. /master/frameworks/operations/create - tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], parts[4]) - tf.OperationType = parts[5] - case "tasks": - // e.g. /master/frameworks/tasks/active/running - tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[5]) - tf.TaskState = parts[6] - case "roles": - // e.g. /master/frameworks/roles/public - tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[1], parts[4], parts[6]) - tf.RoleName = parts[5] - default: - // default to excluding framework name and id, but otherwise leaving path as is - log.Printf("I! Unexpected metric name %s", parts[4]) - tf.FieldName = fmt.Sprintf("%s/%s/%s", parts[0], parts[1], strings.Join(parts[4:], "/")) - } - } - } else if parts[0] == "allocator" { - switch parts[2] { - case "roles": - // e.g. /allocator/roles/shares/dominant - tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[2], parts[4], parts[5]) - tf.RoleName = parts[3] - case "offer_filters": - // e.g. /allocator/offer_filters/roles/active - tf.FieldName = fmt.Sprintf("%s/%s/%s/%s", parts[0], parts[2], parts[3], parts[5]) - tf.RoleName = parts[4] - case "quota": - // e.g. /allocator/quota/roles/resources/offered_or_allocated - tf.FieldName = fmt.Sprintf("%s/%s/%s/%s/%s", parts[0], parts[2], parts[3], parts[5], parts[7]) - tf.RoleName = parts[4] - tf.Resource = parts[6] - default: - // default to leaving path as is - log.Printf("I! Unexpected metric name %s", parts[2]) - tf.FieldName = strings.Join(parts, "/") - } - } - - return tf -} - func init() { inputs.Add("mesos", func() telegraf.Input { return &Mesos{} diff --git a/plugins/inputs/mesos/mesos_test.go b/plugins/inputs/mesos/mesos_test.go index c33aa48dfa2c9..1e00dd307856d 100644 --- a/plugins/inputs/mesos/mesos_test.go +++ b/plugins/inputs/mesos/mesos_test.go @@ -8,11 +8,9 @@ import ( "net/http/httptest" "net/url" "os" - "strings" "testing" "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -203,9 +201,6 @@ func generateMetrics() { "allocator/mesos/allocation_run_latency_ms/p999", "allocator/mesos/allocation_run_latency_ms/p9999", "allocator/mesos/roles/*/shares/dominant", - // test case against hash collisions in TaggedFields - // e.g. framework_name=marathon and role_name=marathon - "allocator/mesos/roles/marathon/shares/dominant", "allocator/mesos/event_queue_dispatches", "allocator/mesos/offer_filters/roles/*/active", "allocator/mesos/quota/roles/*/resources/disk/offered_or_allocated", @@ -217,7 +212,6 @@ func generateMetrics() { "allocator/mesos/resources/disk/total", "allocator/mesos/resources/mem/offered_or_allocated", "allocator/mesos/resources/mem/total", - "allocator/mesos/unknown/unknown/unknown/unknown", // test case for unknown metric type } for _, k := range metricNames { @@ -364,166 +358,7 @@ func TestMesosMaster(t *testing.T) { t.Errorf(err.Error()) } - expectedUntaggedMetrics := map[string]interface{}{} - for k, v := range masterMetrics { - parts := strings.Split(k, "/") - if !strings.HasPrefix(k, "master/frameworks/") && (!strings.HasPrefix(k, "allocator/") || len(parts) <= 5) { - expectedUntaggedMetrics[k] = v - } - } - // for unknown allocator metric type test case, expect no additional tags - expectedUntaggedMetrics["allocator/mesos/unknown/unknown/unknown/unknown"] = masterMetrics["allocator/mesos/unknown/unknown/unknown/unknown"] - - acc.AssertContainsFields(t, "mesos", expectedUntaggedMetrics) - - frameworkFields := []map[string]interface{}{ - // framework offers - { - // "unknown" metric type should still contain framework_name tag - "master/frameworks/unknown/unknown": masterMetrics["master/frameworks/marathon/abc-123/unknown/unknown"], - "master/frameworks/calls_total": masterMetrics["master/frameworks/marathon/abc-123/calls"], - "master/frameworks/events_total": masterMetrics["master/frameworks/marathon/abc-123/events"], - "master/frameworks/operations_total": masterMetrics["master/frameworks/marathon/abc-123/operations"], - "master/frameworks/subscribed_total": masterMetrics["master/frameworks/marathon/abc-123/subscribed"], - "master/frameworks/offers/sent": masterMetrics["master/frameworks/marathon/abc-123/offers/sent"], - }, - { - "master/frameworks/tasks/active": masterMetrics["master/frameworks/marathon/abc-123/tasks/active/task_killing"], - }, - { - "master/frameworks/tasks/active": masterMetrics["master/frameworks/marathon/abc-123/tasks/active/task_dropped"], - "master/frameworks/tasks/terminal": masterMetrics["master/frameworks/marathon/abc-123/tasks/terminal/task_dropped"], - }, - { - "master/frameworks/roles/suppressed": masterMetrics["master/frameworks/marathon/abc-123/roles/*/suppressed"], - }, - { - "master/frameworks/calls": masterMetrics["master/frameworks/marathon/abc-123/calls/accept"], - }, - { - "master/frameworks/events": masterMetrics["master/frameworks/marathon/abc-123/events/error"], - }, - { - "master/frameworks/operations": masterMetrics["master/frameworks/marathon/abc-123/operations/create"], - }, - // allocator - { - "allocator/roles/shares/dominant": masterMetrics["allocator/mesos/roles/*/shares/dominant"], - "allocator/offer_filters/roles/active": masterMetrics["allocator/mesos/offer_filters/roles/*/active"], - }, - { - "allocator/roles/shares/dominant": masterMetrics["allocator/mesos/roles/marathon/shares/dominant"], - }, - { - "allocator/quota/roles/resources/offered_or_allocated": masterMetrics["allocator/mesos/quota/roles/*/resources/disk/offered_or_allocated"], - "allocator/quota/roles/resources/guarantee": masterMetrics["allocator/mesos/quota/roles/*/resources/disk/guarantee"], - }, - { - "allocator/quota/roles/resources/guarantee": masterMetrics["allocator/mesos/quota/roles/*/resources/mem/guarantee"], - }, - } - - frameworkTags := []map[string]string{ - // framework offers - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "framework_name": "marathon", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "framework_name": "marathon", - "task_state": "task_killing", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "framework_name": "marathon", - "task_state": "task_dropped", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "framework_name": "marathon", - "role_name": "*", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "framework_name": "marathon", - "call_type": "accept", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "framework_name": "marathon", - "event_type": "error", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "framework_name": "marathon", - "operation_type": "create", - }, - // allocator - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "role_name": "*", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "role_name": "marathon", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "role_name": "*", - "resource": "disk", - }, - { - "server": m.masterURLs[0].Hostname(), - "url": masterTestServer.URL, - "role": "master", - "state": "leader", - "role_name": "*", - "resource": "mem", - }, - } - - for i := 0; i < len(frameworkFields); i++ { - acc.AssertContainsTaggedFields(t, "mesos", frameworkFields[i], frameworkTags[i]) - // Test that none of the other metrics share the same tags, which - // tests against potential hash collisions in TaggedFields. - for j := 0; j < len(frameworkFields); j++ { - if j == i { - continue - } - acc.AssertDoesNotContainsTaggedFields(t, "mesos", frameworkFields[j], frameworkTags[i]) - } - } + acc.AssertContainsFields(t, "mesos", masterMetrics) } func TestMasterFilter(t *testing.T) { @@ -572,18 +407,6 @@ func TestMesosSlave(t *testing.T) { } acc.AssertContainsFields(t, "mesos", slaveMetrics) - - // expectedFields := make(map[string]interface{}, len(slaveTaskMetrics["statistics"].(map[string]interface{}))+1) - // for k, v := range slaveTaskMetrics["statistics"].(map[string]interface{}) { - // expectedFields[k] = v - // } - // expectedFields["executor_id"] = slaveTaskMetrics["executor_id"] - - // acc.AssertContainsTaggedFields( - // t, - // "mesos_tasks", - // expectedFields, - // map[string]string{"server": "127.0.0.1", "framework_id": slaveTaskMetrics["framework_id"].(string)}) } func TestSlaveFilter(t *testing.T) { @@ -629,59 +452,3 @@ func TestURLTagDoesNotModify(t *testing.T) { require.Equal(t, u.String(), "http://a:b@localhost:5051?timeout=1ms") require.Equal(t, v, "http://localhost:5051") } - -func TestTaggedFieldHash(t *testing.T) { - assert := assert.New(t) - tf := TaggedField{ - FrameworkName: "marathon", - CallType: "accept", - EventType: "error", - OperationType: "create", - TaskState: "active", - RoleName: "marathon", - FieldName: "field/name", - Resource: "mem", - Value: 1.0, - } - assert.Equal("tf_fn:marathon_ct:accept_et:error_ot:create_ts:active_rn:marathon_r:mem", tf.hash()) - - // Test against hash collisions - tf1 := TaggedField{ - FrameworkName: "marathon", - FieldName: "field/name/1", - Value: 1.0, - } - tf2 := TaggedField{ - RoleName: "marathon", - FieldName: "field/name/2", - Value: 1.0, - } - assert.NotEqual(tf1.hash(), tf2.hash()) -} - -func TestTaggedFieldTags(t *testing.T) { - assert := assert.New(t) - tf := TaggedField{ - FrameworkName: "marathon", - CallType: "accept", - EventType: "error", - OperationType: "create", - TaskState: "active", - RoleName: "marathon", - FieldName: "field/name", - Resource: "mem", - Value: 1.0, - } - - expectedTags := fieldTags{ - "framework_name": "marathon", - "call_type": "accept", - "event_type": "error", - "operation_type": "create", - "task_state": "active", - "role_name": "marathon", - "resource": "mem", - } - - assert.Equal(expectedTags, tf.tags()) -}