Skip to content

Commit

Permalink
Cherry-pick elastic#8551 to 6.x: Better error messaging for Elastic s…
Browse files Browse the repository at this point in the history
…tacks' metricsets (elastic#8577)

Cherry-pick of PR elastic#8551 to 6.x branch. Original message: 

This PR makes error messages better and more consistent across all metricsets for Elastic stack products, including the xpack monitoring code paths:

* [x] `elasticsearch/ccr`
* [x] `elasticsearch/cluster_stats`
* [x] `elasticsearch/index`
* [x] `elasticsearch/index_recovery`
* [x] `elasticsearch/index_summary`
* [x] `elasticsearch/ml_job`
* [x] `elasticsearch/node`
* [x] `elasticsearch/node_stats`
* [x] `elasticsearch/pending_tasks`
* [x] `elasticsearch/shard`
* [x] `kibana/status`
* [x] `kibana/stats`
* [x] `logstash/node`
* [x] `logstash/node_stats`
  • Loading branch information
ycombinator authored Oct 5, 2018
1 parent 37cc410 commit 252e6cd
Show file tree
Hide file tree
Showing 48 changed files with 504 additions and 250 deletions.
6 changes: 6 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ func (b *BaseMetricSet) Name() string {
return b.name
}

// FullyQualifiedName returns the complete name of the MetricSet, including the
// name of the module.
func (b *BaseMetricSet) FullyQualifiedName() string {
return b.Module().Name() + "/" + b.Name()
}

// Module returns the parent Module for the MetricSet.
func (b *BaseMetricSet) Module() Module {
return b.module
Expand Down
14 changes: 8 additions & 6 deletions metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ package ccr
import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

func init() {
mb.Registry.MustAddMetricSet("elasticsearch", "ccr", New,
mb.Registry.MustAddMetricSet(elasticsearch.ModuleName, "ccr", New,
mb.WithHostParser(elasticsearch.HostParser),
)
}
Expand All @@ -43,7 +45,7 @@ type MetricSet struct {

// New create a new instance of the MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The elasticsearch ccr metricset is beta")
cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta")

ms, err := elasticsearch.NewMetricSet(base, ccrStatsPath)
if err != nil {
Expand All @@ -56,13 +58,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath)
if err != nil {
r.Error(err)
r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
return
}

// Not master, no event sent
if !isMaster {
logp.Debug("elasticsearch", "Trying to fetch ccr stats from a non master node.")
logp.Debug(elasticsearch.ModuleName, "trying to fetch ccr stats from a non master node.")
return
}

Expand All @@ -80,9 +82,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
}

if !isCCRStatsAPIAvailable {
const errorMsg = "the elasticsearch ccr metricset is only supported with Elasticsearch >= %v. " +
const errorMsg = "the %v metricset is only supported with Elasticsearch >= %v. " +
"You are currently running Elasticsearch %v"
r.Error(fmt.Errorf(errorMsg, elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion))
r.Error(fmt.Errorf(errorMsg, m.FullyQualifiedName(), elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion))
return
}

Expand Down
4 changes: 3 additions & 1 deletion metricbeat/module/elasticsearch/ccr/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
Expand Down Expand Up @@ -54,6 +55,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
r.Error(err)
return err
}
Expand Down Expand Up @@ -83,7 +85,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
}

event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", "elasticsearch")
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/elasticsearch/ccr/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper/elastic"
Expand All @@ -34,6 +35,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
r.Error(err)
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package cluster_stats

import (
"fmt"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
Expand All @@ -44,7 +44,7 @@ type MetricSet struct {

// New create a new instance of the MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The elasticsearch cluster_stats metricset is beta")
cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta")

ms, err := elasticsearch.NewMetricSet(base, clusterStatsPath)
if err != nil {
Expand All @@ -57,13 +57,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+clusterStatsPath)
if err != nil {
r.Error(fmt.Errorf("Error fetching master info: %s", err))
r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
return
}

// Not master, no event sent
if !isMaster {
logp.Debug("elasticsearch", "Trying to fetch cluster stats from a non master node.")
logp.Debug(elasticsearch.ModuleName, "trying to fetch cluster stats from a non master node.")
return
}

Expand Down
9 changes: 6 additions & 3 deletions metricbeat/module/elasticsearch/cluster_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package cluster_stats
import (
"encoding/json"

"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"

s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

var (
Expand Down Expand Up @@ -56,12 +57,14 @@ func eventMapping(r mb.ReporterV2, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response")
r.Error(err)
return err
}

metricSetFields, err := schema.Apply(data)
if err != nil {
err = errors.Wrap(err, "failure applying cluster stats schema")
r.Error(err)
return err
}
Expand All @@ -73,7 +76,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error {

var event mb.Event
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", "elasticsearch")
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", clusterName)
Expand Down
23 changes: 12 additions & 11 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
"strings"
"time"

"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) {
Expand Down Expand Up @@ -145,7 +146,7 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return err
return errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response")
}

clusterStats := common.MapStr(data)
Expand All @@ -161,44 +162,44 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {

info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
return errors.Wrap(err, "failed to get info from Elasticsearch")
}

license, err := elasticsearch.GetLicense(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
return errors.Wrap(err, "failed to get license from Elasticsearch")
}

clusterStateMetrics := []string{"version", "master_node", "nodes", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics)
if err != nil {
return err
return errors.Wrap(err, "failed to get cluster state from Elasticsearch")
}

if err = elasticsearch.PassThruField("status", clusterStats, clusterState); err != nil {
return err
return errors.Wrap(err, "failed to pass through status field")
}

nodesHash, err := computeNodesHash(clusterState)
if err != nil {
return err
return errors.Wrap(err, "failed to compute nodes hash")
}
clusterState.Put("nodes_hash", nodesHash)

usage, err := elasticsearch.GetStackUsage(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
return errors.Wrap(err, "failed to get stack usage from Elasticsearch")
}

clusterNeedsTLS, err := clusterNeedsTLSEnabled(license, usage)
if err != nil {
return err
return errors.Wrap(err, "failed to determine if cluster needs TLS enabled")
}
license.Put("cluster_needs_tls", clusterNeedsTLS) // This powers a cluster alert for enabling TLS on the ES transport protocol

isAPMFound, err := apmIndicesExist(clusterState)
if err != nil {
return err
return errors.Wrap(err, "failed to determine if APM indices exist")
}
delete(clusterState, "routing_table") // We don't want to index the routing table in monitoring indices

Expand Down
10 changes: 6 additions & 4 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
Expand Down Expand Up @@ -59,27 +60,28 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
var indicesStruct IndicesStruct
err := json.Unmarshal(content, &indicesStruct)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch Stats API response")
r.Error(err)
return err
}

var errors multierror.Errors
var errs multierror.Errors
for name, index := range indicesStruct.Indices {
event := mb.Event{}
event.MetricSetFields, err = schema.Apply(index)
if err != nil {
r.Error(err)
errors = append(errors, err)
errs = append(errs, errors.Wrap(err, "failure applying index schema"))
}
// Write name here as full name only available as key
event.MetricSetFields["name"] = name
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", "elasticsearch")
event.RootFields.Put("service.name", elasticsearch.ModuleName)
event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)
r.Event(event)
}

return errors.Err()
return errs.Err()
}
22 changes: 12 additions & 10 deletions metricbeat/module/elasticsearch/index/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,31 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
var indicesStruct IndicesStruct
err := json.Unmarshal(content, &indicesStruct)
if err != nil {
m.Log.Errorw("Failure parsing Indices Stats Elasticsearch API response", "error", err)
err = errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response")
m.Log.Error(err)
return err
}

clusterStateMetrics := []string{"metadata", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics)
if err != nil {
m.Log.Errorw("Failure retrieving cluster state from Elasticsearch", "error", err)
err = errors.Wrap(err, "failure retrieving cluster state from Elasticsearch")
m.Log.Error(err)
return err
}

for name, index := range indicesStruct.Indices {
event := mb.Event{}
indexStats, err := xpackSchema.Apply(index)
if err != nil {
m.Log.Errorw("Failure applying index stats schema", "error", err)
m.Log.Error(errors.Wrap(err, "failure applying index stats schema"))
continue
}
indexStats["index"] = name

err = addClusterStateFields(name, indexStats, clusterState)
if err != nil {
m.Log.Errorw("Failure adding cluster state fields", "error", err)
m.Log.Error(errors.Wrap(err, "failure adding cluster state fields"))
continue
}

Expand Down Expand Up @@ -201,16 +203,16 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {
areAllPrimariesStarted := true
areAllReplicasStarted := true

for _, indexShard := range shards {
for indexName, indexShard := range shards {
is, ok := indexShard.([]interface{})
if !ok {
return "", fmt.Errorf("shards is not an array")
}

for _, shard := range is {
for shardIdx, shard := range is {
s, ok := shard.(map[string]interface{})
if !ok {
return "", fmt.Errorf("shards is not an array of shard objects")
return "", fmt.Errorf("%v.shards[%v] is not a map", indexName, shardIdx)
}

shard := common.MapStr(s)
Expand Down Expand Up @@ -250,16 +252,16 @@ func getIndexShardStats(shards common.MapStr) (common.MapStr, error) {
initializing := 0
relocating := 0

for _, indexShard := range shards {
for indexName, indexShard := range shards {
is, ok := indexShard.([]interface{})
if !ok {
return nil, fmt.Errorf("shards is not an array")
}

for _, shard := range is {
for shardIdx, shard := range is {
s, ok := shard.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("shards is not an array of shard objects")
return nil, fmt.Errorf("%v.shards[%v] is not a map", indexName, shardIdx)
}

shard := common.MapStr(s)
Expand Down
Loading

0 comments on commit 252e6cd

Please sign in to comment.