Skip to content

Commit

Permalink
Cherry-pick #8445 to 6.x: [Elasticsearch Monitoring] Add cluster_meta…
Browse files Browse the repository at this point in the history
…data to cluster_stats docs (#8990)

Cherry-pick of PR #8445 to 6.x branch. Original message: 

Porting over elastic/elasticsearch#33860 to the Metricbeat Elasticsearch module (X-Pack Monitoring code path).

This PR teaches Elasticsearch X-Pack Monitoring to collect cluster metadata, if any is set, and index it into `cluster_stats` docs in `.monitoring-es-6-mb-*`.

After this PR, `cluster_stats` docs in `.monitoring-es-6-mb-*` will contain an additional top-level `cluster_settings` field like so:

```
{
   ...
   "cluster_settings": {
     "cluster": {
       "metadata": {
         ...
       }
     }
   }
}
```
  • Loading branch information
ycombinator authored Nov 8, 2018
1 parent 5f3f4f5 commit 31173e8
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Add Kafka dashboard. {pull}8457[8457]
- Add untyped metric support to the prometheus module. {pull}8681[8681]
- Release Kafka module as GA. {pull}8854[8854]
- Collect custom cluster `display_name` in `elasticsearch/cluster_stats` metricset. {pull}8445[8445]

*Packetbeat*

Expand Down
24 changes: 24 additions & 0 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,22 @@ func apmIndicesExist(clusterState common.MapStr) (bool, error) {
return false, nil
}

func getClusterMetadataSettings(m *MetricSet) (common.MapStr, error) {
// For security reasons we only get the display_name setting
filterPaths := []string{"*.cluster.metadata.display_name"}
clusterSettings, err := elasticsearch.GetClusterSettingsWithDefaults(m.HTTP, m.HTTP.GetURI(), filterPaths)
if err != nil {
return nil, errors.Wrap(err, "failure to get cluster settings")
}

clusterSettings, err = elasticsearch.MergeClusterSettings(clusterSettings)
if err != nil {
return nil, errors.Wrap(err, "failure to merge cluster settings")
}

return clusterSettings, nil
}

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
Expand Down Expand Up @@ -219,6 +235,14 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, c
"stack_stats": stackStats,
}

clusterSettings, err := getClusterMetadataSettings(m)
if err != nil {
return err
}
if clusterSettings != nil {
event.RootFields.Put("cluster_settings", clusterSettings)
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
r.Event(event)

Expand Down
114 changes: 105 additions & 9 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/helper/elastic"
Expand Down Expand Up @@ -96,7 +98,7 @@ func IsMaster(http *helper.HTTP, uri string) (bool, error) {
}

func getNodeName(http *helper.HTTP, uri string) (string, error) {
content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
content, err := fetchPath(http, uri, "/_nodes/_local/nodes", "")
if err != nil {
return "", err
}
Expand All @@ -116,7 +118,7 @@ func getNodeName(http *helper.HTTP, uri string) (string, error) {

func getMasterName(http *helper.HTTP, uri string) (string, error) {
// TODO: evaluate on why when run with ?local=true request does not contain master_node field
content, err := fetchPath(http, uri, "_cluster/state/master_node")
content, err := fetchPath(http, uri, "_cluster/state/master_node", "")
if err != nil {
return "", err
}
Expand All @@ -133,7 +135,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {
// GetInfo returns the data for the Elasticsearch / endpoint.
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {

content, err := fetchPath(http, uri, "/")
content, err := fetchPath(http, uri, "/", "")
if err != nil {
return nil, err
}
Expand All @@ -144,13 +146,13 @@ func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
return info, nil
}

func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
func fetchPath(http *helper.HTTP, uri, path string, query string) ([]byte, error) {
defer http.SetURI(uri)

// Parses the uri to replace the path
u, _ := url.Parse(uri)
u.Path = path
u.RawQuery = ""
u.RawQuery = query

// Http helper includes the HostData with username and password
http.SetURI(u.String())
Expand All @@ -160,7 +162,7 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
// GetNodeInfo returns the node information.
func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) {

content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
content, err := fetchPath(http, uri, "/_nodes/_local/nodes", "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -191,7 +193,7 @@ func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {

// Not cached, fetch license from Elasticsearch
if license == nil {
content, err := fetchPath(http, resetURI, "_xpack/license")
content, err := fetchPath(http, resetURI, "_xpack/license", "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -225,7 +227,7 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm
clusterStateURI += "/" + strings.Join(metrics, ",")
}

content, err := fetchPath(http, resetURI, clusterStateURI)
content, err := fetchPath(http, resetURI, clusterStateURI, "")
if err != nil {
return nil, err
}
Expand All @@ -235,9 +237,39 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm
return clusterState, err
}

// GetClusterSettingsWithDefaults returns cluster settings.
func GetClusterSettingsWithDefaults(http *helper.HTTP, resetURI string, filterPaths []string) (common.MapStr, error) {
return GetClusterSettings(http, resetURI, true, filterPaths)
}

// GetClusterSettings returns cluster settings
func GetClusterSettings(http *helper.HTTP, resetURI string, includeDefaults bool, filterPaths []string) (common.MapStr, error) {
clusterSettingsURI := "_cluster/settings"
var queryParams []string
if includeDefaults {
queryParams = append(queryParams, "include_defaults=true")
}

if filterPaths != nil && len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, clusterSettingsURI, queryString)
if err != nil {
return nil, err
}

var clusterSettings map[string]interface{}
err = json.Unmarshal(content, &clusterSettings)
return clusterSettings, err
}

// GetStackUsage returns stack usage information.
func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_xpack/usage")
content, err := fetchPath(http, resetURI, "_xpack/usage", "")
if err != nil {
return nil, err
}
Expand All @@ -259,6 +291,47 @@ func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error
return nil
}

// MergeClusterSettings merges cluster settings in the correct precedence order
func MergeClusterSettings(clusterSettings common.MapStr) (common.MapStr, error) {
transientSettings, err := getSettingGroup(clusterSettings, "transient")
if err != nil {
return nil, err
}

persistentSettings, err := getSettingGroup(clusterSettings, "persistent")
if err != nil {
return nil, err
}

settings, err := getSettingGroup(clusterSettings, "default")
if err != nil {
return nil, err
}

// Transient settings override persistent settings which override default settings
if settings == nil {
settings = persistentSettings
}

if settings == nil {
settings = transientSettings
}

if settings == nil {
return nil, nil
}

if persistentSettings != nil {
settings.DeepUpdate(persistentSettings)
}

if transientSettings != nil {
settings.DeepUpdate(transientSettings)
}

return settings, nil
}

// IsCCRStatsAPIAvailable returns whether the CCR stats API is available in the given version
// of Elasticsearch.
func IsCCRStatsAPIAvailable(currentElasticsearchVersion string) (bool, error) {
Expand Down Expand Up @@ -295,3 +368,26 @@ func (c *_licenseCache) set(license common.MapStr, ttl time.Duration) {
c.ttl = ttl
c.cachedOn = time.Now()
}

func getSettingGroup(allSettings common.MapStr, groupKey string) (common.MapStr, error) {
hasSettingGroup, err := allSettings.HasKey(groupKey)
if err != nil {
return nil, errors.Wrap(err, "failure to determine if "+groupKey+" settings exist")
}

if !hasSettingGroup {
return nil, nil
}

settings, err := allSettings.GetValue(groupKey)
if err != nil {
return nil, errors.Wrap(err, "failure to extract "+groupKey+" settings")
}

v, ok := settings.(map[string]interface{})
if !ok {
return nil, errors.Wrap(err, groupKey+" settings are not a map")
}

return common.MapStr(v), nil
}

0 comments on commit 31173e8

Please sign in to comment.