From f3bb1cacf45f162c65e705707763726774f639b6 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 22 Mar 2018 12:30:08 +0100 Subject: [PATCH] Metricbeat: Add option to Jolokia/JMX mappings to explicitely group metrics A new optional `event` option is added to attribute mappings, so attributes with the same `event` are sent in the same event to Elastic and attributes with different `event` are sent in different events. Some additional related refactorings done to improve type safety. --- CHANGELOG.asciidoc | 1 + .../module/jolokia/jmx/_meta/docs.asciidoc | 7 +- metricbeat/module/jolokia/jmx/config.go | 21 ++- metricbeat/module/jolokia/jmx/data.go | 71 ++++----- metricbeat/module/jolokia/jmx/data_test.go | 136 ++++++++++++++++-- metricbeat/module/jolokia/jmx/jmx.go | 2 +- 6 files changed, 190 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index be6a8289be4..52538adc0d1 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -257,6 +257,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Making the MongoDB module GA. {pull}6554[6554] - Allow to disable labels `dedot` in Docker module, in favor of a safe way to keep dots. {pull}6490[6490] - Add experimental module to collect metrics from munin nodes. {pull}6517[6517] +- Add support for wildcards and explicit metrics grouping in jolokia/jmx. {pull}6462[6462] *Packetbeat* diff --git a/metricbeat/module/jolokia/jmx/_meta/docs.asciidoc b/metricbeat/module/jolokia/jmx/_meta/docs.asciidoc index d294cfd743f..f328bd24907 100644 --- a/metricbeat/module/jolokia/jmx/_meta/docs.asciidoc +++ b/metricbeat/module/jolokia/jmx/_meta/docs.asciidoc @@ -22,6 +22,7 @@ mapping: attributes: - attr: Uptime field: uptime + event: uptime --- In case the underlying attribute is an object (e.g. see HeapMemoryUsage attribute in java.lang:type=Memory) its @@ -34,8 +35,10 @@ All metrics from a single mapping will be POSTed to the defined host/port and se To make it possible to differentiate between metrics from multiple similar applications running on the same host, please configure multiple modules. -When wildcards are used, an event will be sent to Elastic for each matching mbean, in that case a `mbean` field is -added. +When wildcards are used, an event will be sent to Elastic for each matching mbean, in that case a `mbean` field is added. + +Optionally, an `event` name can be added to each attribute, this makes all metrics with the same `event` +to be grouped in the same event when being sent to Elastic. It is required to set a namespace in the general module config section. diff --git a/metricbeat/module/jolokia/jmx/config.go b/metricbeat/module/jolokia/jmx/config.go index a711f3059d6..e2d2bbb3334 100644 --- a/metricbeat/module/jolokia/jmx/config.go +++ b/metricbeat/module/jolokia/jmx/config.go @@ -10,6 +10,7 @@ type JMXMapping struct { type Attribute struct { Attr string Field string + Event string } // RequestBlock is used to build the request blocks of the following format: @@ -37,8 +38,22 @@ type RequestBlock struct { Attribute []string `json:"attribute"` } -func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, map[string]string, error) { - responseMapping := map[string]string{} +type attributeMappingKey struct { + mbean, attr string +} + +// AttributeMapping contains the mapping information between attributes in Jolokia +// responses and fields in metricbeat events +type AttributeMapping map[attributeMappingKey]Attribute + +// Get the mapping options for the attribute of an mbean +func (m AttributeMapping) Get(mbean, attr string) (Attribute, bool) { + a, found := m[attributeMappingKey{mbean, attr}] + return a, found +} + +func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, AttributeMapping, error) { + responseMapping := make(AttributeMapping) var blocks []RequestBlock for _, mapping := range mappings { @@ -49,7 +64,7 @@ func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, map[string]strin for _, attribute := range mapping.Attributes { rb.Attribute = append(rb.Attribute, attribute.Attr) - responseMapping[mapping.MBean+"_"+attribute.Attr] = attribute.Field + responseMapping[attributeMappingKey{mapping.MBean, attribute.Attr}] = attribute } blocks = append(blocks, rb) } diff --git a/metricbeat/module/jolokia/jmx/data.go b/metricbeat/module/jolokia/jmx/data.go index c3b1c154260..dce3aeca47d 100644 --- a/metricbeat/module/jolokia/jmx/data.go +++ b/metricbeat/module/jolokia/jmx/data.go @@ -74,26 +74,26 @@ type Entry struct { // "status": 200, // } // } -func eventMapping(content []byte, mapping map[string]string) ([]common.MapStr, error) { +type eventKey struct { + mbean, event string +} + +func eventMapping(content []byte, mapping AttributeMapping) ([]common.MapStr, error) { var entries []Entry if err := json.Unmarshal(content, &entries); err != nil { return nil, errors.Wrapf(err, "failed to unmarshal jolokia JSON response '%v'", string(content)) } // Generate a different event for each wildcard mbean, and and additional one - // for non-wildcard requested mbeans - mbeanEvents := make(map[string]common.MapStr) + // for non-wildcard requested mbeans, group them by event name if defined + mbeanEvents := make(map[eventKey]common.MapStr) var errs multierror.Errors for _, v := range entries { + hasWildcard := strings.Contains(v.Request.Mbean, "*") for attribute, value := range v.Value { - if !strings.Contains(v.Request.Mbean, "*") { - event, found := mbeanEvents[""] - if !found { - event = common.MapStr{} - mbeanEvents[""] = event - } - err := parseResponseEntry(v.Request.Mbean, attribute, value, event, mapping) + if !hasWildcard { + err := parseResponseEntry(v.Request.Mbean, v.Request.Mbean, attribute, value, mbeanEvents, mapping) if err != nil { errs = append(errs, err) } @@ -110,15 +110,8 @@ func eventMapping(content []byte, mapping map[string]string) ([]common.MapStr, e } responseMbean := attribute - event, found := mbeanEvents[responseMbean] - if !found { - event = common.MapStr{} - event.Put(mbeanEventKey, responseMbean) - mbeanEvents[responseMbean] = event - } - for attribute, value := range values { - err := parseResponseEntry(v.Request.Mbean, attribute, value, event, mapping) + err := parseResponseEntry(v.Request.Mbean, responseMbean, attribute, value, mbeanEvents, mapping) if err != nil { errs = append(errs, err) } @@ -134,34 +127,48 @@ func eventMapping(content []byte, mapping map[string]string) ([]common.MapStr, e return events, errs.Err() } +func selectEvent(events map[eventKey]common.MapStr, key eventKey) common.MapStr { + event, found := events[key] + if !found { + event = common.MapStr{} + if key.mbean != "" { + event.Put(mbeanEventKey, key.mbean) + } + events[key] = event + } + return event +} + func parseResponseEntry( requestMbeanName string, + responseMbeanName string, attributeName string, - attibuteValue interface{}, - event common.MapStr, - mapping map[string]string, + attributeValue interface{}, + events map[eventKey]common.MapStr, + mapping AttributeMapping, ) error { - // Create metric name by merging mbean and attribute fields. - var metricName = requestMbeanName + "_" + attributeName - - key, exists := mapping[metricName] + field, exists := mapping.Get(requestMbeanName, attributeName) if !exists { - return errors.Errorf("metric key '%v' not found in response (%+v)", metricName, mapping) + return errors.Errorf("metric key '%v' not found in response (%+v)", attributeName, mapping) } - var err error + var key eventKey + key.event = field.Event + if responseMbeanName != requestMbeanName { + key.mbean = responseMbeanName + } + event := selectEvent(events, key) // In case the attributeValue is a map the keys are dedotted - c, ok := attibuteValue.(map[string]interface{}) + data := attributeValue + c, ok := data.(map[string]interface{}) if ok { newData := map[string]interface{}{} for k, v := range c { newData[common.DeDot(k)] = v } - _, err = event.Put(key, newData) - } else { - _, err = event.Put(key, attibuteValue) + data = newData } - + _, err := event.Put(field.Field, data) return err } diff --git a/metricbeat/module/jolokia/jmx/data_test.go b/metricbeat/module/jolokia/jmx/data_test.go index e301848d742..06b5e992471 100644 --- a/metricbeat/module/jolokia/jmx/data_test.go +++ b/metricbeat/module/jolokia/jmx/data_test.go @@ -20,13 +20,19 @@ func TestEventMapper(t *testing.T) { assert.Nil(t, err) - var mapping = map[string]string{ - "java.lang:type=Runtime_Uptime": "uptime", - "java.lang:type=GarbageCollector,name=ConcurrentMarkSweep_CollectionTime": "gc.cms_collection_time", - "java.lang:type=GarbageCollector,name=ConcurrentMarkSweep_CollectionCount": "gc.cms_collection_count", - "java.lang:type=Memory_HeapMemoryUsage": "memory.heap_usage", - "java.lang:type=Memory_NonHeapMemoryUsage": "memory.non_heap_usage", - "org.springframework.boot:type=Endpoint,name=metricsEndpoint_Metrics": "metrics", + var mapping = AttributeMapping{ + attributeMappingKey{"java.lang:type=Runtime", "Uptime"}: Attribute{ + Attr: "Uptime", Field: "uptime"}, + attributeMappingKey{"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep", "CollectionTime"}: Attribute{ + Attr: "CollectionTime", Field: "gc.cms_collection_time"}, + attributeMappingKey{"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep", "CollectionCount"}: Attribute{ + Attr: "CollectionCount", Field: "gc.cms_collection_count"}, + attributeMappingKey{"java.lang:type=Memory", "HeapMemoryUsage"}: Attribute{ + Attr: "HeapMemoryUsage", Field: "memory.heap_usage"}, + attributeMappingKey{"java.lang:type=Memory", "NonHeapMemoryUsage"}: Attribute{ + Attr: "NonHEapMemoryUsage", Field: "memory.non_heap_usage"}, + attributeMappingKey{"org.springframework.boot:type=Endpoint,name=metricsEndpoint", "Metrics"}: Attribute{ + Attr: "Metrics", Field: "metrics"}, } events, err := eventMapping(jolokiaResponse, mapping) @@ -65,6 +71,71 @@ func TestEventMapper(t *testing.T) { assert.ElementsMatch(t, expected, events) } +func TestEventGroupingMapper(t *testing.T) { + absPath, err := filepath.Abs("./_meta/test") + + assert.NotNil(t, absPath) + assert.Nil(t, err) + + jolokiaResponse, err := ioutil.ReadFile(absPath + "/jolokia_response.json") + + assert.Nil(t, err) + + var mapping = AttributeMapping{ + attributeMappingKey{"java.lang:type=Runtime", "Uptime"}: Attribute{ + Attr: "Uptime", Field: "uptime"}, + attributeMappingKey{"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep", "CollectionTime"}: Attribute{ + Attr: "CollectionTime", Field: "gc.cms_collection_time", Event: "gc"}, + attributeMappingKey{"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep", "CollectionCount"}: Attribute{ + Attr: "CollectionCount", Field: "gc.cms_collection_count", Event: "gc"}, + attributeMappingKey{"java.lang:type=Memory", "HeapMemoryUsage"}: Attribute{ + Attr: "HeapMemoryUsage", Field: "memory.heap_usage", Event: "memory"}, + attributeMappingKey{"java.lang:type=Memory", "NonHeapMemoryUsage"}: Attribute{ + Attr: "NonHEapMemoryUsage", Field: "memory.non_heap_usage", Event: "memory"}, + attributeMappingKey{"org.springframework.boot:type=Endpoint,name=metricsEndpoint", "Metrics"}: Attribute{ + Attr: "Metrics", Field: "metrics"}, + } + + events, err := eventMapping(jolokiaResponse, mapping) + assert.Nil(t, err) + + expected := []common.MapStr{ + { + "uptime": float64(47283), + "metrics": map[string]interface{}{ + "atomikos_nbTransactions": float64(0), + "classes": float64(18857), + "classes_loaded": float64(19127), + "classes_unloaded": float64(270), + }, + }, + { + "gc": common.MapStr{ + "cms_collection_time": float64(53), + "cms_collection_count": float64(1), + }, + }, + { + "memory": common.MapStr{ + "heap_usage": map[string]interface{}{ + "init": float64(1073741824), + "committed": float64(1037959168), + "max": float64(1037959168), + "used": float64(227420472), + }, + "non_heap_usage": map[string]interface{}{ + "init": float64(2555904), + "committed": float64(53477376), + "max": float64(-1), + "used": float64(50519768), + }, + }, + }, + } + + assert.ElementsMatch(t, expected, events) +} + func TestEventMapperWithWildcard(t *testing.T) { absPath, err := filepath.Abs("./_meta/test") @@ -75,9 +146,11 @@ func TestEventMapperWithWildcard(t *testing.T) { assert.Nil(t, err) - var mapping = map[string]string{ - "Catalina:name=*,type=ThreadPool_port": "port", - "Catalina:name=*,type=ThreadPool_maxConnections": "max_connections", + var mapping = AttributeMapping{ + attributeMappingKey{"Catalina:name=*,type=ThreadPool", "port"}: Attribute{ + Attr: "port", Field: "port"}, + attributeMappingKey{"Catalina:name=*,type=ThreadPool", "maxConnections"}: Attribute{ + Attr: "maxConnections", Field: "max_connections"}, } events, err := eventMapping(jolokiaResponse, mapping) @@ -99,3 +172,46 @@ func TestEventMapperWithWildcard(t *testing.T) { assert.ElementsMatch(t, expected, events) } + +func TestEventGroupingMapperWithWildcard(t *testing.T) { + absPath, err := filepath.Abs("./_meta/test") + + assert.NotNil(t, absPath) + assert.Nil(t, err) + + jolokiaResponse, err := ioutil.ReadFile(absPath + "/jolokia_response_wildcard.json") + + assert.Nil(t, err) + + var mapping = AttributeMapping{ + attributeMappingKey{"Catalina:name=*,type=ThreadPool", "port"}: Attribute{ + Attr: "port", Field: "port", Event: "port"}, + attributeMappingKey{"Catalina:name=*,type=ThreadPool", "maxConnections"}: Attribute{ + Attr: "maxConnections", Field: "max_connections", Event: "network"}, + } + + events, err := eventMapping(jolokiaResponse, mapping) + assert.Nil(t, err) + assert.Equal(t, 4, len(events)) + + expected := []common.MapStr{ + { + "mbean": "Catalina:name=\"http-bio-8080\",type=ThreadPool", + "port": float64(8080), + }, + { + "mbean": "Catalina:name=\"http-bio-8080\",type=ThreadPool", + "max_connections": float64(200), + }, + { + "mbean": "Catalina:name=\"ajp-bio-8009\",type=ThreadPool", + "port": float64(8009), + }, + { + "mbean": "Catalina:name=\"ajp-bio-8009\",type=ThreadPool", + "max_connections": float64(200), + }, + } + + assert.ElementsMatch(t, expected, events) +} diff --git a/metricbeat/module/jolokia/jmx/jmx.go b/metricbeat/module/jolokia/jmx/jmx.go index 3dbf69f4d03..c06b58268c5 100644 --- a/metricbeat/module/jolokia/jmx/jmx.go +++ b/metricbeat/module/jolokia/jmx/jmx.go @@ -37,7 +37,7 @@ var ( // MetricSet type defines all fields of the MetricSet type MetricSet struct { mb.BaseMetricSet - mapping map[string]string + mapping AttributeMapping namespace string http *helper.HTTP log *logp.Logger