Skip to content

Commit

Permalink
Consistency in Elastic stack metricsets' code (#8308)
Browse files Browse the repository at this point in the history
* Consistent error handling in elasticsearch/cluster_stats metricset

* Consistent error handling in elasticsearch/index_recovery metricset

* Remove punctuation from error messages

* Inline variable

* Reordering imports

* Adding periods to the ends of godoc comments

* More consistency cleanup

* More consistency fixes

* More consistency fixes

* Fixing API path

* Consistent code in elasticsearch/pending_tasks metricset

* More code consistency

* Consistent code in elasticsearch/shard metricset

* Consistent code in elasticsearch/ccr metricset

* Making code in kibana module metricsets consistent

* Making fully-qualified metricset name consistent

* Use elasticsearch.ModuleName constant instead of string literal

* Making logstash/node metricset code consistent

* Making the logstash/node_stats metricset code consistent

* Refactoring common reporting and error pattern into helper function

* Updating unit tests

* Changes from running make fmt

* Re-running make fmt after downgrading golang to 10.3

* Fixes due to make update

* Updating LS module integration tests

* Update kibana/status integration test

* Report error with event (for non x-pack path)

* Attaching errors to events

* Fixing imports in integration tests

* Fixing error

(cherry picked from commit 3b2399d)
  • Loading branch information
ycombinator committed Nov 2, 2018
1 parent 3fd1032 commit 8958e89
Show file tree
Hide file tree
Showing 47 changed files with 532 additions and 331 deletions.
8 changes: 8 additions & 0 deletions metricbeat/helper/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
)
Expand Down Expand Up @@ -108,3 +110,9 @@ func IsFeatureAvailable(currentProductVersion, featureAvailableInProductVersion

return !currentVersion.LessThan(wantVersion), nil
}

// ReportAndLogError reports and logs the given error
func ReportAndLogError(err error, r mb.ReporterV2, l *logp.Logger) {
r.Error(err)
l.Error(err)
}
27 changes: 15 additions & 12 deletions metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -58,49 +58,52 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath)
if err != nil {
r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
err = errors.Wrap(err, "error determining if connected Elasticsearch node is master")
elastic.ReportAndLogError(err, r, m.Log)
return
}

// Not master, no event sent
if !isMaster {
logp.Debug(elasticsearch.ModuleName, "trying to fetch ccr stats from a non master node.")
m.Log.Debug("trying to fetch ccr stats from a non-master node")
return
}

info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath)
if err != nil {
r.Error(err)
elastic.ReportAndLogError(err, r, m.Log)
return
}

elasticsearchVersion := info.Version.Number
isCCRStatsAPIAvailable, err := elasticsearch.IsCCRStatsAPIAvailable(elasticsearchVersion)
if err != nil {
r.Error(err)
elastic.ReportAndLogError(err, r, m.Log)
return
}

if !isCCRStatsAPIAvailable {
const errorMsg = "the %v metricset is only supported with Elasticsearch >= %v. " +
"You are currently running Elasticsearch %v"
r.Error(fmt.Errorf(errorMsg, m.FullyQualifiedName(), elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion))
err = fmt.Errorf(errorMsg, m.FullyQualifiedName(), elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion)
elastic.ReportAndLogError(err, r, m.Log)
return
}

content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
elastic.ReportAndLogError(err, r, m.Log)
return
}

if m.XPack {
eventsMappingXPack(r, m, *info, content)
err = eventsMappingXPack(r, m, *info, content)
} else {
err = eventsMapping(r, *info, content)
if err != nil {
r.Error(err)
return
}
}

if err != nil {
m.Log.Error(err)
return
}
}
33 changes: 19 additions & 14 deletions metricbeat/module/elasticsearch/ccr/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,40 +60,45 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
return err
}

var errors multierror.Errors
var errs multierror.Errors
for _, followerShards := range data {

shards, ok := followerShards.([]interface{})
if !ok {
err := fmt.Errorf("shards is not an array")
errors = append(errors, err)
errs = append(errs, err)
r.Error(err)
continue
}

for _, s := range shards {
event := mb.Event{}
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

shard, ok := s.(map[string]interface{})
if !ok {
err := fmt.Errorf("shard is not an object")
errors = append(errors, err)
event.Error = fmt.Errorf("shard is not an object")
r.Event(event)
errs = append(errs, event.Error)
continue
}
event := mb.Event{}

event.MetricSetFields, err = schema.Apply(shard)
if err != nil {
errors = append(errors, err)
event.Error = errors.Wrap(err, "failure applying shard schema")
r.Event(event)
errs = append(errs, event.Error)
continue
}

event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

r.Event(event)
}
}

return errors.Err()
return errs.Err()
}
4 changes: 1 addition & 3 deletions metricbeat/module/elasticsearch/ccr/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
r.Error(err)
return err
return errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
}

var errors multierror.Errors
Expand Down
17 changes: 11 additions & 6 deletions metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -57,19 +57,20 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+clusterStatsPath)
if err != nil {
r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
err := errors.Wrap(err, "error determining if connected Elasticsearch node is master")
elastic.ReportAndLogError(err, r, m.Log)
return
}

// Not master, no event sent
if !isMaster {
logp.Debug(elasticsearch.ModuleName, "trying to fetch cluster stats from a non master node.")
m.Log.Debug("trying to fetch cluster stats from a non-master node")
return
}

content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
elastic.ReportAndLogError(err, r, m.Log)
return
}

Expand All @@ -80,8 +81,12 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
}

if m.MetricSet.XPack {
eventMappingXPack(r, m, *info, content)
err = eventMappingXPack(r, m, *info, content)
} else {
eventMapping(r, *info, content)
err = eventMapping(r, *info, content)
}

if err != nil {
m.Log.Error(err)
}
}
11 changes: 6 additions & 5 deletions metricbeat/module/elasticsearch/cluster_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ func eventMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) erro
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response")
r.Error(err)
event.Error = errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response")
r.Event(event)
return event.Error
}

metricSetFields, err := schema.Apply(data)
if err != nil {
err = errors.Wrap(err, "failure applying cluster stats schema")
r.Error(err)
return err
event.Error = errors.Wrap(err, "failure applying cluster stats schema")
r.Event(event)
return event.Error
}

event.MetricSetFields = metricSetFields
Expand Down
28 changes: 14 additions & 14 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import (
"github.com/elastic/beats/metricbeat/helper/elastic"
)

// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available
// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available.
const CCRStatsAPIAvailableVersion = "6.5.0"

// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id
// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id.
var clusterIDCache = map[string]string{}

// ModuleName is the ame of this module
// ModuleName is the name of this module.
const ModuleName = "elasticsearch"

// Info construct contains the data from the Elasticsearch / endpoint
Expand All @@ -48,7 +48,7 @@ type Info struct {
} `json:"version"`
}

// NodeInfo struct cotains data about the node
// NodeInfo struct cotains data about the node.
type NodeInfo struct {
Host string `json:"host"`
TransportAddress string `json:"transport_address"`
Expand All @@ -57,7 +57,7 @@ type NodeInfo struct {
ID string
}

// GetClusterID fetches cluster id for given nodeID
// GetClusterID fetches cluster id for given nodeID.
func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) {
// Check if cluster id already cached. If yes, return it.
if clusterID, ok := clusterIDCache[nodeID]; ok {
Expand All @@ -73,7 +73,7 @@ func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error)
return info.ClusterID, nil
}

// IsMaster checks if the given node host is a master node
// IsMaster checks if the given node host is a master node.
//
// The detection of the master is done in two steps:
// * Fetch node name from /_nodes/_local/name
Expand Down Expand Up @@ -130,7 +130,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {
return clusterStruct.MasterNode, nil
}

// GetInfo returns the data for the Elasticsearch / endpoint
// GetInfo returns the data for the Elasticsearch / endpoint.
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {

content, err := fetchPath(http, uri, "/")
Expand All @@ -157,7 +157,7 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
return http.FetchContent()
}

// GetNodeInfo returns the node information
// GetNodeInfo returns the node information.
func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) {

content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
Expand All @@ -184,7 +184,7 @@ func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error

// GetLicense returns license information. Since we don't expect license information
// to change frequently, the information is cached for 1 minute to avoid
// hitting Elasticsearch frequently
// hitting Elasticsearch frequently.
func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {
// First, check the cache
license := licenseCache.get()
Expand Down Expand Up @@ -218,7 +218,7 @@ func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {
return licenseCache.get(), nil
}

// GetClusterState returns cluster state information
// GetClusterState returns cluster state information.
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (common.MapStr, error) {
clusterStateURI := "_cluster/state"
if metrics != nil && len(metrics) > 0 {
Expand All @@ -235,7 +235,7 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm
return clusterState, err
}

// GetStackUsage returns stack usage information
// GetStackUsage returns stack usage information.
func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_xpack/usage")
if err != nil {
Expand All @@ -248,7 +248,7 @@ func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
}

// PassThruField copies the field at the given path from the given source data object into
// the same path in the given target data object
// the same path in the given target data object.
func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error {
fieldValue, err := sourceData.GetValue(fieldPath)
if err != nil {
Expand All @@ -260,12 +260,12 @@ func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error
}

// IsCCRStatsAPIAvailable returns whether the CCR stats API is available in the given version
// of Elasticsearch
// of Elasticsearch.
func IsCCRStatsAPIAvailable(currentElasticsearchVersion string) (bool, error) {
return elastic.IsFeatureAvailable(currentElasticsearchVersion, CCRStatsAPIAvailableVersion)
}

// Global cache for license information. Assumption is that license information changes infrequently
// Global cache for license information. Assumption is that license information changes infrequently.
var licenseCache = &_licenseCache{}

type _licenseCache struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func getEnvPort() string {
// GetConfig returns config for elasticsearch module
func getConfig(metricset string) map[string]interface{} {
return map[string]interface{}{
"module": "elasticsearch",
"module": elasticsearch.ModuleName,
"metricsets": []string{metricset},
"hosts": []string{getEnvHost() + ":" + getEnvPort()},
"index_recovery.active_only": false,
Expand Down
19 changes: 12 additions & 7 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,23 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
var errs multierror.Errors
for name, index := range indicesStruct.Indices {
event := mb.Event{}
event.MetricSetFields, err = schema.Apply(index)
if err != nil {
r.Error(err)
errs = append(errs, errors.Wrap(err, "failure applying index schema"))
}
// Write name here as full name only available as key
event.MetricSetFields["name"] = name

event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

event.MetricSetFields, err = schema.Apply(index)
if err != nil {
event.Error = errors.Wrap(err, "failure applying index schema")
r.Event(event)
errs = append(errs, event.Error)
continue
}
// Write name here as full name only available as key
event.MetricSetFields["name"] = name
r.Event(event)
}

Expand Down
Loading

0 comments on commit 8958e89

Please sign in to comment.