Skip to content

Commit

Permalink
Metricbeat: Support wildcards in jolokia (#6453) (#6462)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano authored and ruflin committed Mar 26, 2018
1 parent 7e6870f commit 7ac8e2f
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 106 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Making the MongoDB module GA. {pull}6554[6554]
- Allow to disable labels `dedot` in Docker module, in favor of a safe way to keep dots. {pull}6490[6490]
- Add experimental module to collect metrics from munin nodes. {pull}6517[6517]
- Add support for wildcards and explicit metrics grouping in jolokia/jmx. {pull}6462[6462]

*Packetbeat*

Expand Down
8 changes: 7 additions & 1 deletion metricbeat/module/jolokia/jmx/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mapping:
attributes:
- attr: Uptime
field: uptime
event: uptime
---

In case the underlying attribute is an object (e.g. see HeapMemoryUsage attribute in java.lang:type=Memory) its
Expand All @@ -34,11 +35,16 @@ All metrics from a single mapping will be POSTed to the defined host/port and se
To make it possible to differentiate between metrics from multiple similar applications running on the same host,
please configure multiple modules.

When wildcards are used, an event will be sent to Elastic for each matching mbean, in that case a `mbean` field is added.

Optionally, an `event` name can be added to each attribute, this makes all metrics with the same `event`
to be grouped in the same event when being sent to Elastic.

It is required to set a namespace in the general module config section.

[float]
=== Limitations
No authentication against Jolokia is supported yet. No wildcards in Jolokia requests supported yet.
No authentication against Jolokia is supported yet.
All Jolokia requests have canonicalNaming set to false (details see here: https://jolokia.org/reference/html/protocol.html).


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[
{
"value": {
"Catalina:name=\"http-bio-8080\",type=ThreadPool": {
"maxConnections": 200,
"port": 8080
},
"Catalina:name=\"ajp-bio-8009\",type=ThreadPool": {
"maxConnections": 200,
"port": 8009
}
},
"request": {
"type": "read",
"attribute": [
"port",
"maxConnections"
],
"mbean": "Catalina:name=*,type=ThreadPool"
},
"status": 200,
"timestamp": 1520469345
}
]
21 changes: 18 additions & 3 deletions metricbeat/module/jolokia/jmx/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type JMXMapping struct {
type Attribute struct {
Attr string
Field string
Event string
}

// RequestBlock is used to build the request blocks of the following format:
Expand Down Expand Up @@ -37,8 +38,22 @@ type RequestBlock struct {
Attribute []string `json:"attribute"`
}

func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, map[string]string, error) {
responseMapping := map[string]string{}
type attributeMappingKey struct {
mbean, attr string
}

// AttributeMapping contains the mapping information between attributes in Jolokia
// responses and fields in metricbeat events
type AttributeMapping map[attributeMappingKey]Attribute

// Get the mapping options for the attribute of an mbean
func (m AttributeMapping) Get(mbean, attr string) (Attribute, bool) {
a, found := m[attributeMappingKey{mbean, attr}]
return a, found
}

func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, AttributeMapping, error) {
responseMapping := make(AttributeMapping)
var blocks []RequestBlock

for _, mapping := range mappings {
Expand All @@ -49,7 +64,7 @@ func buildRequestBodyAndMapping(mappings []JMXMapping) ([]byte, map[string]strin

for _, attribute := range mapping.Attributes {
rb.Attribute = append(rb.Attribute, attribute.Attr)
responseMapping[mapping.MBean+"_"+attribute.Attr] = attribute.Field
responseMapping[attributeMappingKey{mapping.MBean, attribute.Attr}] = attribute
}
blocks = append(blocks, rb)
}
Expand Down
116 changes: 94 additions & 22 deletions metricbeat/module/jolokia/jmx/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package jmx

import (
"encoding/json"
"strings"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
)

const (
mbeanEventKey = "mbean"
)

type Entry struct {
Request struct {
Mbean string `json:"mbean"`
Expand Down Expand Up @@ -47,56 +52,123 @@ type Entry struct {
// "status": 200
// }
// ]
func eventMapping(content []byte, mapping map[string]string) (common.MapStr, error) {
//
// With wildcards there is an additional nesting level:
//
// [
// {
// "request": {
// "type": "read",
// "attribute": "maxConnections",
// "mbean": "Catalina:name=*,type=ThreadPool"
// },
// "value": {
// "Catalina:name=\"http-bio-8080\",type=ThreadPool": {
// "maxConnections": 200
// },
// "Catalina:name=\"ajp-bio-8009\",type=ThreadPool": {
// "maxConnections": 200
// }
// },
// "timestamp": 1519409583
// "status": 200,
// }
// }
type eventKey struct {
mbean, event string
}

func eventMapping(content []byte, mapping AttributeMapping) ([]common.MapStr, error) {
var entries []Entry
if err := json.Unmarshal(content, &entries); err != nil {
return nil, errors.Wrapf(err, "failed to unmarshal jolokia JSON response '%v'", string(content))
}

event := common.MapStr{}
// Generate a different event for each wildcard mbean, and and additional one
// for non-wildcard requested mbeans, group them by event name if defined
mbeanEvents := make(map[eventKey]common.MapStr)
var errs multierror.Errors

for _, v := range entries {
hasWildcard := strings.Contains(v.Request.Mbean, "*")
for attribute, value := range v.Value {
// Extend existing event
err := parseResponseEntry(v.Request.Mbean, attribute, value, event, mapping)
if err != nil {
errs = append(errs, err)
if !hasWildcard {
err := parseResponseEntry(v.Request.Mbean, v.Request.Mbean, attribute, value, mbeanEvents, mapping)
if err != nil {
errs = append(errs, err)
}
continue
}

// If there was a wildcard, we are going to have an additional
// nesting level in response values, and attribute here is going
// to be actually the matching mbean name
values, ok := value.(map[string]interface{})
if !ok {
errs = append(errs, errors.Errorf("expected map of values for %s", v.Request.Mbean))
continue
}

responseMbean := attribute
for attribute, value := range values {
err := parseResponseEntry(v.Request.Mbean, responseMbean, attribute, value, mbeanEvents, mapping)
if err != nil {
errs = append(errs, err)
}
}
}
}

return event, errs.Err()
var events []common.MapStr
for _, event := range mbeanEvents {
events = append(events, event)
}

return events, errs.Err()
}

func selectEvent(events map[eventKey]common.MapStr, key eventKey) common.MapStr {
event, found := events[key]
if !found {
event = common.MapStr{}
if key.mbean != "" {
event.Put(mbeanEventKey, key.mbean)
}
events[key] = event
}
return event
}

func parseResponseEntry(
mbeanName string,
requestMbeanName string,
responseMbeanName string,
attributeName string,
attibuteValue interface{},
event common.MapStr,
mapping map[string]string,
attributeValue interface{},
events map[eventKey]common.MapStr,
mapping AttributeMapping,
) error {
// Create metric name by merging mbean and attribute fields.
var metricName = mbeanName + "_" + attributeName

key, exists := mapping[metricName]
field, exists := mapping.Get(requestMbeanName, attributeName)
if !exists {
return errors.Errorf("metric key '%v' not found in response", metricName)
return errors.Errorf("metric key '%v' not found in response (%+v)", attributeName, mapping)
}

var err error
var key eventKey
key.event = field.Event
if responseMbeanName != requestMbeanName {
key.mbean = responseMbeanName
}
event := selectEvent(events, key)

// In case the attributeValue is a map the keys are dedotted
c, ok := attibuteValue.(map[string]interface{})
data := attributeValue
c, ok := data.(map[string]interface{})
if ok {
newData := map[string]interface{}{}
for k, v := range c {
newData[common.DeDot(k)] = v
}
_, err = event.Put(key, newData)
} else {
_, err = event.Put(key, attibuteValue)
data = newData
}

_, err := event.Put(field.Field, data)
return err
}
Loading

0 comments on commit 7ac8e2f

Please sign in to comment.