Skip to content

Commit

Permalink
Metricbeat: Add option to Jolokia/JMX mappings to explicitely group m…
Browse files Browse the repository at this point in the history
…etrics

A new optional `event` option is added to attribute mappings, so
attributes with the same `event` are sent in the same event to Elastic
and attributes with different `event` are sent in different events.

Some additional related refactorings done to improve type safety.
  • Loading branch information
jsoriano committed Mar 22, 2018
1 parent efcb8ec commit f3bb1ca
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Making the MongoDB module GA. {pull}6554[6554]
- Allow to disable labels `dedot` in Docker module, in favor of a safe way to keep dots. {pull}6490[6490]
- Add experimental module to collect metrics from munin nodes. {pull}6517[6517]
- Add support for wildcards and explicit metrics grouping in jolokia/jmx. {pull}6462[6462]

*Packetbeat*

Expand Down
7 changes: 5 additions & 2 deletions metricbeat/module/jolokia/jmx/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mapping:
attributes:
- attr: Uptime
field: uptime
event: uptime
---

In case the underlying attribute is an object (e.g. see HeapMemoryUsage attribute in java.lang:type=Memory) its
Expand All @@ -34,8 +35,10 @@ All metrics from a single mapping will be POSTed to the defined host/port and se
To make it possible to differentiate between metrics from multiple similar applications running on the same host,
please configure multiple modules.

When wildcards are used, an event will be sent to Elastic for each matching mbean, in that case a `mbean` field is
added.
When wildcards are used, an event will be sent to Elastic for each matching mbean, in that case a `mbean` field is added.

Optionally, an `event` name can be added to each attribute, this makes all metrics with the same `event`
to be grouped in the same event when being sent to Elastic.

It is required to set a namespace in the general module config section.

Expand Down
21 changes: 18 additions & 3 deletions metricbeat/module/jolokia/jmx/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type JMXMapping struct {
type Attribute struct {
Attr string
Field string
Event string
}

// RequestBlock is used to build the request blocks of the following format:
Expand Down Expand Up @@ -37,8 +38,22 @@ type RequestBlock struct {
Attribute []string `json:"attribute"`
}

func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, map[string]string, error) {
responseMapping := map[string]string{}
type attributeMappingKey struct {
mbean, attr string
}

// AttributeMapping contains the mapping information between attributes in Jolokia
// responses and fields in metricbeat events
type AttributeMapping map[attributeMappingKey]Attribute

// Get the mapping options for the attribute of an mbean
func (m AttributeMapping) Get(mbean, attr string) (Attribute, bool) {
a, found := m[attributeMappingKey{mbean, attr}]
return a, found
}

func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, AttributeMapping, error) {
responseMapping := make(AttributeMapping)
var blocks []RequestBlock

for _, mapping := range mappings {
Expand All @@ -49,7 +64,7 @@ func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, map[string]strin

for _, attribute := range mapping.Attributes {
rb.Attribute = append(rb.Attribute, attribute.Attr)
responseMapping[mapping.MBean+"_"+attribute.Attr] = attribute.Field
responseMapping[attributeMappingKey{mapping.MBean, attribute.Attr}] = attribute
}
blocks = append(blocks, rb)
}
Expand Down
71 changes: 39 additions & 32 deletions metricbeat/module/jolokia/jmx/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,26 @@ type Entry struct {
// "status": 200,
// }
// }
func eventMapping(content []byte, mapping map[string]string) ([]common.MapStr, error) {
type eventKey struct {
mbean, event string
}

func eventMapping(content []byte, mapping AttributeMapping) ([]common.MapStr, error) {
var entries []Entry
if err := json.Unmarshal(content, &entries); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal jolokia JSON response '%v'", string(content))
}

// Generate a different event for each wildcard mbean, and and additional one
// for non-wildcard requested mbeans
mbeanEvents := make(map[string]common.MapStr)
// for non-wildcard requested mbeans, group them by event name if defined
mbeanEvents := make(map[eventKey]common.MapStr)
var errs multierror.Errors

for _, v := range entries {
hasWildcard := strings.Contains(v.Request.Mbean, "*")
for attribute, value := range v.Value {
if !strings.Contains(v.Request.Mbean, "*") {
event, found := mbeanEvents[""]
if !found {
event = common.MapStr{}
mbeanEvents[""] = event
}
err := parseResponseEntry(v.Request.Mbean, attribute, value, event, mapping)
if !hasWildcard {
err := parseResponseEntry(v.Request.Mbean, v.Request.Mbean, attribute, value, mbeanEvents, mapping)
if err != nil {
errs = append(errs, err)
}
Expand All @@ -110,15 +110,8 @@ func eventMapping(content []byte, mapping map[string]string) ([]common.MapStr, e
}

responseMbean := attribute
event, found := mbeanEvents[responseMbean]
if !found {
event = common.MapStr{}
event.Put(mbeanEventKey, responseMbean)
mbeanEvents[responseMbean] = event
}

for attribute, value := range values {
err := parseResponseEntry(v.Request.Mbean, attribute, value, event, mapping)
err := parseResponseEntry(v.Request.Mbean, responseMbean, attribute, value, mbeanEvents, mapping)
if err != nil {
errs = append(errs, err)
}
Expand All @@ -134,34 +127,48 @@ func eventMapping(content []byte, mapping map[string]string) ([]common.MapStr, e
return events, errs.Err()
}

func selectEvent(events map[eventKey]common.MapStr, key eventKey) common.MapStr {
event, found := events[key]
if !found {
event = common.MapStr{}
if key.mbean != "" {
event.Put(mbeanEventKey, key.mbean)
}
events[key] = event
}
return event
}

func parseResponseEntry(
requestMbeanName string,
responseMbeanName string,
attributeName string,
attibuteValue interface{},
event common.MapStr,
mapping map[string]string,
attributeValue interface{},
events map[eventKey]common.MapStr,
mapping AttributeMapping,
) error {
// Create metric name by merging mbean and attribute fields.
var metricName = requestMbeanName + "_" + attributeName

key, exists := mapping[metricName]
field, exists := mapping.Get(requestMbeanName, attributeName)
if !exists {
return errors.Errorf("metric key '%v' not found in response (%+v)", metricName, mapping)
return errors.Errorf("metric key '%v' not found in response (%+v)", attributeName, mapping)
}

var err error
var key eventKey
key.event = field.Event
if responseMbeanName != requestMbeanName {
key.mbean = responseMbeanName
}
event := selectEvent(events, key)

// In case the attributeValue is a map the keys are dedotted
c, ok := attibuteValue.(map[string]interface{})
data := attributeValue
c, ok := data.(map[string]interface{})
if ok {
newData := map[string]interface{}{}
for k, v := range c {
newData[common.DeDot(k)] = v
}
_, err = event.Put(key, newData)
} else {
_, err = event.Put(key, attibuteValue)
data = newData
}

_, err := event.Put(field.Field, data)
return err
}
136 changes: 126 additions & 10 deletions metricbeat/module/jolokia/jmx/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ func TestEventMapper(t *testing.T) {

assert.Nil(t, err)

var mapping = map[string]string{
"java.lang:type=Runtime_Uptime": "uptime",
"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep_CollectionTime": "gc.cms_collection_time",
"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep_CollectionCount": "gc.cms_collection_count",
"java.lang:type=Memory_HeapMemoryUsage": "memory.heap_usage",
"java.lang:type=Memory_NonHeapMemoryUsage": "memory.non_heap_usage",
"org.springframework.boot:type=Endpoint,name=metricsEndpoint_Metrics": "metrics",
var mapping = AttributeMapping{
attributeMappingKey{"java.lang:type=Runtime", "Uptime"}: Attribute{
Attr: "Uptime", Field: "uptime"},
attributeMappingKey{"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep", "CollectionTime"}: Attribute{
Attr: "CollectionTime", Field: "gc.cms_collection_time"},
attributeMappingKey{"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep", "CollectionCount"}: Attribute{
Attr: "CollectionCount", Field: "gc.cms_collection_count"},
attributeMappingKey{"java.lang:type=Memory", "HeapMemoryUsage"}: Attribute{
Attr: "HeapMemoryUsage", Field: "memory.heap_usage"},
attributeMappingKey{"java.lang:type=Memory", "NonHeapMemoryUsage"}: Attribute{
Attr: "NonHEapMemoryUsage", Field: "memory.non_heap_usage"},
attributeMappingKey{"org.springframework.boot:type=Endpoint,name=metricsEndpoint", "Metrics"}: Attribute{
Attr: "Metrics", Field: "metrics"},
}

events, err := eventMapping(jolokiaResponse, mapping)
Expand Down Expand Up @@ -65,6 +71,71 @@ func TestEventMapper(t *testing.T) {
assert.ElementsMatch(t, expected, events)
}

func TestEventGroupingMapper(t *testing.T) {
absPath, err := filepath.Abs("./_meta/test")

assert.NotNil(t, absPath)
assert.Nil(t, err)

jolokiaResponse, err := ioutil.ReadFile(absPath + "/jolokia_response.json")

assert.Nil(t, err)

var mapping = AttributeMapping{
attributeMappingKey{"java.lang:type=Runtime", "Uptime"}: Attribute{
Attr: "Uptime", Field: "uptime"},
attributeMappingKey{"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep", "CollectionTime"}: Attribute{
Attr: "CollectionTime", Field: "gc.cms_collection_time", Event: "gc"},
attributeMappingKey{"java.lang:type=GarbageCollector,name=ConcurrentMarkSweep", "CollectionCount"}: Attribute{
Attr: "CollectionCount", Field: "gc.cms_collection_count", Event: "gc"},
attributeMappingKey{"java.lang:type=Memory", "HeapMemoryUsage"}: Attribute{
Attr: "HeapMemoryUsage", Field: "memory.heap_usage", Event: "memory"},
attributeMappingKey{"java.lang:type=Memory", "NonHeapMemoryUsage"}: Attribute{
Attr: "NonHEapMemoryUsage", Field: "memory.non_heap_usage", Event: "memory"},
attributeMappingKey{"org.springframework.boot:type=Endpoint,name=metricsEndpoint", "Metrics"}: Attribute{
Attr: "Metrics", Field: "metrics"},
}

events, err := eventMapping(jolokiaResponse, mapping)
assert.Nil(t, err)

expected := []common.MapStr{
{
"uptime": float64(47283),
"metrics": map[string]interface{}{
"atomikos_nbTransactions": float64(0),
"classes": float64(18857),
"classes_loaded": float64(19127),
"classes_unloaded": float64(270),
},
},
{
"gc": common.MapStr{
"cms_collection_time": float64(53),
"cms_collection_count": float64(1),
},
},
{
"memory": common.MapStr{
"heap_usage": map[string]interface{}{
"init": float64(1073741824),
"committed": float64(1037959168),
"max": float64(1037959168),
"used": float64(227420472),
},
"non_heap_usage": map[string]interface{}{
"init": float64(2555904),
"committed": float64(53477376),
"max": float64(-1),
"used": float64(50519768),
},
},
},
}

assert.ElementsMatch(t, expected, events)
}

func TestEventMapperWithWildcard(t *testing.T) {
absPath, err := filepath.Abs("./_meta/test")

Expand All @@ -75,9 +146,11 @@ func TestEventMapperWithWildcard(t *testing.T) {

assert.Nil(t, err)

var mapping = map[string]string{
"Catalina:name=*,type=ThreadPool_port": "port",
"Catalina:name=*,type=ThreadPool_maxConnections": "max_connections",
var mapping = AttributeMapping{
attributeMappingKey{"Catalina:name=*,type=ThreadPool", "port"}: Attribute{
Attr: "port", Field: "port"},
attributeMappingKey{"Catalina:name=*,type=ThreadPool", "maxConnections"}: Attribute{
Attr: "maxConnections", Field: "max_connections"},
}

events, err := eventMapping(jolokiaResponse, mapping)
Expand All @@ -99,3 +172,46 @@ func TestEventMapperWithWildcard(t *testing.T) {

assert.ElementsMatch(t, expected, events)
}

func TestEventGroupingMapperWithWildcard(t *testing.T) {
absPath, err := filepath.Abs("./_meta/test")

assert.NotNil(t, absPath)
assert.Nil(t, err)

jolokiaResponse, err := ioutil.ReadFile(absPath + "/jolokia_response_wildcard.json")

assert.Nil(t, err)

var mapping = AttributeMapping{
attributeMappingKey{"Catalina:name=*,type=ThreadPool", "port"}: Attribute{
Attr: "port", Field: "port", Event: "port"},
attributeMappingKey{"Catalina:name=*,type=ThreadPool", "maxConnections"}: Attribute{
Attr: "maxConnections", Field: "max_connections", Event: "network"},
}

events, err := eventMapping(jolokiaResponse, mapping)
assert.Nil(t, err)
assert.Equal(t, 4, len(events))

expected := []common.MapStr{
{
"mbean": "Catalina:name=\"http-bio-8080\",type=ThreadPool",
"port": float64(8080),
},
{
"mbean": "Catalina:name=\"http-bio-8080\",type=ThreadPool",
"max_connections": float64(200),
},
{
"mbean": "Catalina:name=\"ajp-bio-8009\",type=ThreadPool",
"port": float64(8009),
},
{
"mbean": "Catalina:name=\"ajp-bio-8009\",type=ThreadPool",
"max_connections": float64(200),
},
}

assert.ElementsMatch(t, expected, events)
}
2 changes: 1 addition & 1 deletion metricbeat/module/jolokia/jmx/jmx.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
mapping map[string]string
mapping AttributeMapping
namespace string
http *helper.HTTP
log *logp.Logger
Expand Down

0 comments on commit f3bb1ca

Please sign in to comment.