diff --git a/metricbeat/helper/elastic/elastic.go b/metricbeat/helper/elastic/elastic.go index 2d37925e89eb..1fc36f038f77 100644 --- a/metricbeat/helper/elastic/elastic.go +++ b/metricbeat/helper/elastic/elastic.go @@ -21,6 +21,8 @@ import ( "fmt" "strings" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" ) @@ -108,3 +110,9 @@ func IsFeatureAvailable(currentProductVersion, featureAvailableInProductVersion return !currentVersion.LessThan(wantVersion), nil } + +// ReportAndLogError reports and logs the given error +func ReportAndLogError(err error, r mb.ReporterV2, l *logp.Logger) { + r.Error(err) + l.Error(err) +} diff --git a/metricbeat/module/elasticsearch/ccr/ccr.go b/metricbeat/module/elasticsearch/ccr/ccr.go index 8cab209277e6..5b2156e2c50e 100644 --- a/metricbeat/module/elasticsearch/ccr/ccr.go +++ b/metricbeat/module/elasticsearch/ccr/ccr.go @@ -23,7 +23,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) @@ -58,49 +58,52 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(r mb.ReporterV2) { isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath) if err != nil { - r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master")) + err = errors.Wrap(err, "error determining if connected Elasticsearch node is master") + elastic.ReportAndLogError(err, r, m.Log) return } // Not master, no event sent if !isMaster { - logp.Debug(elasticsearch.ModuleName, "trying to fetch ccr stats from a non master node.") + m.Log.Debug("trying to fetch ccr stats from a non-master node") return } info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath) if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } elasticsearchVersion := info.Version.Number isCCRStatsAPIAvailable, err := elasticsearch.IsCCRStatsAPIAvailable(elasticsearchVersion) if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } if !isCCRStatsAPIAvailable { const errorMsg = "the %v metricset is only supported with Elasticsearch >= %v. " + "You are currently running Elasticsearch %v" - r.Error(fmt.Errorf(errorMsg, m.FullyQualifiedName(), elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion)) + err = fmt.Errorf(errorMsg, m.FullyQualifiedName(), elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion) + elastic.ReportAndLogError(err, r, m.Log) return } content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } if m.XPack { - eventsMappingXPack(r, m, *info, content) + err = eventsMappingXPack(r, m, *info, content) } else { err = eventsMapping(r, *info, content) - if err != nil { - r.Error(err) - return - } + } + + if err != nil { + m.Log.Error(err) + return } } diff --git a/metricbeat/module/elasticsearch/ccr/data.go b/metricbeat/module/elasticsearch/ccr/data.go index 8c21caa35ce2..0a5548547b31 100644 --- a/metricbeat/module/elasticsearch/ccr/data.go +++ b/metricbeat/module/elasticsearch/ccr/data.go @@ -60,40 +60,45 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err return err } - var errors multierror.Errors + var errs multierror.Errors for _, followerShards := range data { shards, ok := followerShards.([]interface{}) if !ok { err := fmt.Errorf("shards is not an array") - errors = append(errors, err) + errs = append(errs, err) + r.Error(err) continue } for _, s := range shards { + event := mb.Event{} + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", elasticsearch.ModuleName) + + event.ModuleFields = common.MapStr{} + event.ModuleFields.Put("cluster.name", info.ClusterName) + event.ModuleFields.Put("cluster.id", info.ClusterID) + shard, ok := s.(map[string]interface{}) if !ok { - err := fmt.Errorf("shard is not an object") - errors = append(errors, err) + event.Error = fmt.Errorf("shard is not an object") + r.Event(event) + errs = append(errs, event.Error) continue } - event := mb.Event{} + event.MetricSetFields, err = schema.Apply(shard) if err != nil { - errors = append(errors, err) + event.Error = errors.Wrap(err, "failure applying shard schema") + r.Event(event) + errs = append(errs, event.Error) continue } - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", elasticsearch.ModuleName) - - event.ModuleFields = common.MapStr{} - event.ModuleFields.Put("cluster.name", info.ClusterName) - event.ModuleFields.Put("cluster.id", info.ClusterID) - r.Event(event) } } - return errors.Err() + return errs.Err() } diff --git a/metricbeat/module/elasticsearch/ccr/data_xpack.go b/metricbeat/module/elasticsearch/ccr/data_xpack.go index bf0bb047ff94..db5688caa9eb 100644 --- a/metricbeat/module/elasticsearch/ccr/data_xpack.go +++ b/metricbeat/module/elasticsearch/ccr/data_xpack.go @@ -35,9 +35,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, var data map[string]interface{} err := json.Unmarshal(content, &data) if err != nil { - err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response") - r.Error(err) - return err + return errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response") } var errors multierror.Errors diff --git a/metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go b/metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go index ae6d289b8212..6f8214a13df2 100644 --- a/metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go +++ b/metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go @@ -21,7 +21,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) @@ -57,19 +57,20 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(r mb.ReporterV2) { isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+clusterStatsPath) if err != nil { - r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master")) + err := errors.Wrap(err, "error determining if connected Elasticsearch node is master") + elastic.ReportAndLogError(err, r, m.Log) return } // Not master, no event sent if !isMaster { - logp.Debug(elasticsearch.ModuleName, "trying to fetch cluster stats from a non master node.") + m.Log.Debug("trying to fetch cluster stats from a non-master node") return } content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } @@ -80,8 +81,12 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } if m.MetricSet.XPack { - eventMappingXPack(r, m, *info, content) + err = eventMappingXPack(r, m, *info, content) } else { - eventMapping(r, *info, content) + err = eventMapping(r, *info, content) + } + + if err != nil { + m.Log.Error(err) } } diff --git a/metricbeat/module/elasticsearch/cluster_stats/data.go b/metricbeat/module/elasticsearch/cluster_stats/data.go index d0d680562afe..5d0f46b6bef9 100644 --- a/metricbeat/module/elasticsearch/cluster_stats/data.go +++ b/metricbeat/module/elasticsearch/cluster_stats/data.go @@ -64,15 +64,16 @@ func eventMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) erro var data map[string]interface{} err := json.Unmarshal(content, &data) if err != nil { - err = errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response") - r.Error(err) + event.Error = errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response") + r.Event(event) + return event.Error } metricSetFields, err := schema.Apply(data) if err != nil { - err = errors.Wrap(err, "failure applying cluster stats schema") - r.Error(err) - return err + event.Error = errors.Wrap(err, "failure applying cluster stats schema") + r.Event(event) + return event.Error } event.MetricSetFields = metricSetFields diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index 468cf294a95b..4e1c60643a42 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -30,13 +30,13 @@ import ( "github.com/elastic/beats/metricbeat/helper/elastic" ) -// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available +// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available. const CCRStatsAPIAvailableVersion = "6.5.0" -// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id +// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id. var clusterIDCache = map[string]string{} -// ModuleName is the ame of this module +// ModuleName is the name of this module. const ModuleName = "elasticsearch" // Info construct contains the data from the Elasticsearch / endpoint @@ -48,7 +48,7 @@ type Info struct { } `json:"version"` } -// NodeInfo struct cotains data about the node +// NodeInfo struct cotains data about the node. type NodeInfo struct { Host string `json:"host"` TransportAddress string `json:"transport_address"` @@ -57,7 +57,7 @@ type NodeInfo struct { ID string } -// GetClusterID fetches cluster id for given nodeID +// GetClusterID fetches cluster id for given nodeID. func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) { // Check if cluster id already cached. If yes, return it. if clusterID, ok := clusterIDCache[nodeID]; ok { @@ -73,7 +73,7 @@ func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) return info.ClusterID, nil } -// IsMaster checks if the given node host is a master node +// IsMaster checks if the given node host is a master node. // // The detection of the master is done in two steps: // * Fetch node name from /_nodes/_local/name @@ -130,7 +130,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) { return clusterStruct.MasterNode, nil } -// GetInfo returns the data for the Elasticsearch / endpoint +// GetInfo returns the data for the Elasticsearch / endpoint. func GetInfo(http *helper.HTTP, uri string) (*Info, error) { content, err := fetchPath(http, uri, "/") @@ -157,7 +157,7 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) { return http.FetchContent() } -// GetNodeInfo returns the node information +// GetNodeInfo returns the node information. func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) { content, err := fetchPath(http, uri, "/_nodes/_local/nodes") @@ -184,7 +184,7 @@ func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error // GetLicense returns license information. Since we don't expect license information // to change frequently, the information is cached for 1 minute to avoid -// hitting Elasticsearch frequently +// hitting Elasticsearch frequently. func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) { // First, check the cache license := licenseCache.get() @@ -218,7 +218,7 @@ func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) { return licenseCache.get(), nil } -// GetClusterState returns cluster state information +// GetClusterState returns cluster state information. func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (common.MapStr, error) { clusterStateURI := "_cluster/state" if metrics != nil && len(metrics) > 0 { @@ -235,7 +235,7 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm return clusterState, err } -// GetStackUsage returns stack usage information +// GetStackUsage returns stack usage information. func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) { content, err := fetchPath(http, resetURI, "_xpack/usage") if err != nil { @@ -248,7 +248,7 @@ func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) { } // PassThruField copies the field at the given path from the given source data object into -// the same path in the given target data object +// the same path in the given target data object. func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error { fieldValue, err := sourceData.GetValue(fieldPath) if err != nil { @@ -260,12 +260,12 @@ func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error } // IsCCRStatsAPIAvailable returns whether the CCR stats API is available in the given version -// of Elasticsearch +// of Elasticsearch. func IsCCRStatsAPIAvailable(currentElasticsearchVersion string) (bool, error) { return elastic.IsFeatureAvailable(currentElasticsearchVersion, CCRStatsAPIAvailableVersion) } -// Global cache for license information. Assumption is that license information changes infrequently +// Global cache for license information. Assumption is that license information changes infrequently. var licenseCache = &_licenseCache{} type _licenseCache struct { diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index fca4ecd09109..3a19b5888cf3 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -126,7 +126,7 @@ func getEnvPort() string { // GetConfig returns config for elasticsearch module func getConfig(metricset string) map[string]interface{} { return map[string]interface{}{ - "module": "elasticsearch", + "module": elasticsearch.ModuleName, "metricsets": []string{metricset}, "hosts": []string{getEnvHost() + ":" + getEnvPort()}, "index_recovery.active_only": false, diff --git a/metricbeat/module/elasticsearch/index/data.go b/metricbeat/module/elasticsearch/index/data.go index 241307f776ac..ab86f7708ed3 100644 --- a/metricbeat/module/elasticsearch/index/data.go +++ b/metricbeat/module/elasticsearch/index/data.go @@ -68,18 +68,23 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err var errs multierror.Errors for name, index := range indicesStruct.Indices { event := mb.Event{} - event.MetricSetFields, err = schema.Apply(index) - if err != nil { - r.Error(err) - errs = append(errs, errors.Wrap(err, "failure applying index schema")) - } - // Write name here as full name only available as key - event.MetricSetFields["name"] = name + event.RootFields = common.MapStr{} event.RootFields.Put("service.name", elasticsearch.ModuleName) + event.ModuleFields = common.MapStr{} event.ModuleFields.Put("cluster.name", info.ClusterName) event.ModuleFields.Put("cluster.id", info.ClusterID) + + event.MetricSetFields, err = schema.Apply(index) + if err != nil { + event.Error = errors.Wrap(err, "failure applying index schema") + r.Event(event) + errs = append(errs, event.Error) + continue + } + // Write name here as full name only available as key + event.MetricSetFields["name"] = name r.Event(event) } diff --git a/metricbeat/module/elasticsearch/index/data_xpack.go b/metricbeat/module/elasticsearch/index/data_xpack.go index 73b051c013e8..baab66e8ac58 100644 --- a/metricbeat/module/elasticsearch/index/data_xpack.go +++ b/metricbeat/module/elasticsearch/index/data_xpack.go @@ -23,6 +23,7 @@ import ( "strconv" "time" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -96,31 +97,28 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, var indicesStruct IndicesStruct err := json.Unmarshal(content, &indicesStruct) if err != nil { - err = errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response") - m.Log.Error(err) - return err + return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response") } clusterStateMetrics := []string{"metadata", "routing_table"} clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics) if err != nil { - err = errors.Wrap(err, "failure retrieving cluster state from Elasticsearch") - m.Log.Error(err) - return err + return errors.Wrap(err, "failure retrieving cluster state from Elasticsearch") } + var errs multierror.Errors for name, index := range indicesStruct.Indices { event := mb.Event{} indexStats, err := xpackSchema.Apply(index) if err != nil { - m.Log.Error(errors.Wrap(err, "failure applying index stats schema")) + errs = append(errs, errors.Wrap(err, "failure applying index stats schema")) continue } indexStats["index"] = name err = addClusterStateFields(name, indexStats, clusterState) if err != nil { - m.Log.Error(errors.Wrap(err, "failure adding cluster state fields")) + errs = append(errs, errors.Wrap(err, "failure adding cluster state fields")) continue } @@ -136,7 +134,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, r.Event(event) } - return nil + return errs.Err() } // Fields added here are based on same fields being added by internal collection in diff --git a/metricbeat/module/elasticsearch/index/index.go b/metricbeat/module/elasticsearch/index/index.go index a32888784860..459311e79666 100644 --- a/metricbeat/module/elasticsearch/index/index.go +++ b/metricbeat/module/elasticsearch/index/index.go @@ -21,7 +21,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) @@ -46,7 +46,7 @@ type MetricSet struct { // New create a new instance of the MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Beta("The " + base.FullyQualifiedName() + " metricset is beta") + cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta") // TODO: This currently gets index data for all indices. Make it configurable. ms, err := elasticsearch.NewMetricSet(base, statsPath) @@ -61,35 +61,38 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+statsPath) if err != nil { - r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master")) + err = errors.Wrap(err, "error determining if connected Elasticsearch node is master") + elastic.ReportAndLogError(err, r, m.Log) return } // Not master, no event sent if !isMaster { - logp.Debug(elasticsearch.ModuleName, "Trying to fetch index stats from a non-master node.") + m.Log.Debug("trying to fetch index stats from a non-master node") return } content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI) if err != nil { - r.Error(errors.Wrap(err, "failed to get info from Elasticsearch")) + err = errors.Wrap(err, "failed to get info from Elasticsearch") + elastic.ReportAndLogError(err, r, m.Log) return } if m.XPack { - eventsMappingXPack(r, m, *info, content) + err = eventsMappingXPack(r, m, *info, content) } else { err = eventsMapping(r, *info, content) - if err != nil { - r.Error(err) - return - } + } + + if err != nil { + m.Log.Error(err) + return } } diff --git a/metricbeat/module/elasticsearch/index_recovery/data.go b/metricbeat/module/elasticsearch/index_recovery/data.go index e006ff5c022b..2400fa6a075a 100644 --- a/metricbeat/module/elasticsearch/index_recovery/data.go +++ b/metricbeat/module/elasticsearch/index_recovery/data.go @@ -20,11 +20,13 @@ package index_recovery import ( "encoding/json" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) @@ -64,20 +66,32 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { return err } + var errs multierror.Errors for indexName, d := range data { shards, ok := d["shards"] if !ok { + err = elastic.MakeErrorForMissingField(indexName+".shards", elastic.Elasticsearch) + r.Error(err) + errs = append(errs, err) continue } for _, data := range shards { event := mb.Event{} - event.ModuleFields = common.MapStr{} - event.MetricSetFields, _ = schema.Apply(data) - event.ModuleFields.Put("index.name", indexName) + event.RootFields = common.MapStr{} event.RootFields.Put("service.name", elasticsearch.ModuleName) + + event.ModuleFields = common.MapStr{} + event.ModuleFields.Put("index.name", indexName) + + event.MetricSetFields, err = schema.Apply(data) + if err != nil { + event.Error = errors.Wrap(err, "failure applying shard schema") + errs = append(errs, event.Error) + } + r.Event(event) } } - return nil + return errs.Err() } diff --git a/metricbeat/module/elasticsearch/index_recovery/data_xpack.go b/metricbeat/module/elasticsearch/index_recovery/data_xpack.go index 1a9139a41127..77b504e704da 100644 --- a/metricbeat/module/elasticsearch/index_recovery/data_xpack.go +++ b/metricbeat/module/elasticsearch/index_recovery/data_xpack.go @@ -19,6 +19,7 @@ package index_recovery import ( "encoding/json" + "fmt" "time" "github.com/pkg/errors" @@ -40,23 +41,23 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { for indexName, indexData := range data { indexData, ok := indexData.(map[string]interface{}) if !ok { - continue + return fmt.Errorf("%v is not a map", indexName) } shards, ok := indexData["shards"] if !ok { - continue + return elastic.MakeErrorForMissingField(indexName+".shards", elastic.Elasticsearch) } shardsArr, ok := shards.([]interface{}) if !ok { - continue + return fmt.Errorf("%v.shards is not an array", indexName) } - for _, shard := range shardsArr { + for shardIdx, shard := range shardsArr { shard, ok := shard.(map[string]interface{}) if !ok { - continue + return fmt.Errorf("%v.shards[%v] is not a map", indexName, shardIdx) } shard["index_name"] = indexName diff --git a/metricbeat/module/elasticsearch/index_recovery/index_recovery.go b/metricbeat/module/elasticsearch/index_recovery/index_recovery.go index 5f3ff2ce030a..1a39f8936639 100644 --- a/metricbeat/module/elasticsearch/index_recovery/index_recovery.go +++ b/metricbeat/module/elasticsearch/index_recovery/index_recovery.go @@ -21,7 +21,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) @@ -72,31 +72,33 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers stats for each index from the _stats API func (m *MetricSet) Fetch(r mb.ReporterV2) { - isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+m.recoveryPath) if err != nil { - r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master")) + err = errors.Wrap(err, "error determining if connected Elasticsearch node is master") + elastic.ReportAndLogError(err, r, m.Log) return } // Not master, no event sent if !isMaster { - logp.Debug(elasticsearch.ModuleName, "Trying to fetch index recovery stats from a non-master node.") + m.Log.Debug("trying to fetch index recovery stats from a non-master node") return } content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } if m.MetricSet.XPack { - eventsMappingXPack(r, m, content) + err = eventsMappingXPack(r, m, content) } else { err = eventsMapping(r, content) - if err != nil { - r.Error(err) - } + } + + if err != nil { + m.Log.Error(err) + return } } diff --git a/metricbeat/module/elasticsearch/index_summary/data.go b/metricbeat/module/elasticsearch/index_summary/data.go index d51643a057e1..88b7312e8e08 100644 --- a/metricbeat/module/elasticsearch/index_summary/data.go +++ b/metricbeat/module/elasticsearch/index_summary/data.go @@ -69,32 +69,32 @@ var ( ) func eventMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error { + var event mb.Event + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", elasticsearch.ModuleName) + + event.ModuleFields = common.MapStr{} + event.ModuleFields.Put("cluster.name", info.ClusterName) + event.ModuleFields.Put("cluster.id", info.ClusterID) + var all struct { Data map[string]interface{} `json:"_all"` } err := json.Unmarshal(content, &all) if err != nil { - err = errors.Wrap(err, "failure parsing Elasticsearch Stats API response") - r.Error(err) - return err + event.Error = errors.Wrap(err, "failure parsing Elasticsearch Stats API response") + r.Event(event) + return event.Error } fields, err := schema.Apply(all.Data, s.FailOnRequired) if err != nil { - err = errors.Wrap(err, "failure applying stats schema") - r.Error(err) - return err + event.Error = errors.Wrap(err, "failure applying stats schema") + r.Event(event) + return event.Error } - var event mb.Event - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", elasticsearch.ModuleName) - - event.ModuleFields = common.MapStr{} - event.ModuleFields.Put("cluster.name", info.ClusterName) - event.ModuleFields.Put("cluster.id", info.ClusterID) - event.MetricSetFields = fields r.Event(event) diff --git a/metricbeat/module/elasticsearch/index_summary/data_xpack.go b/metricbeat/module/elasticsearch/index_summary/data_xpack.go index 410a65ae2a73..f4b8da190aed 100644 --- a/metricbeat/module/elasticsearch/index_summary/data_xpack.go +++ b/metricbeat/module/elasticsearch/index_summary/data_xpack.go @@ -72,32 +72,21 @@ var ( } ) -func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) []error { +func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { var all struct { Data map[string]interface{} `json:"_all"` } err := json.Unmarshal(content, &all) if err != nil { - return []error{errors.Wrap(err, "failure parsing Elasticsearch Stats API response")} + return errors.Wrap(err, "failure parsing Elasticsearch Stats API response") } - var errs []error - fields, err := schemaXPack.Apply(all.Data) if err != nil { - errs = append(errs, errors.Wrap(err, "failure applying stats schema")) + return errors.Wrap(err, "failure applying stats schema") } - nodeInfo, err := elasticsearch.GetNodeInfo(m.HTTP, m.HostData().SanitizedURI+statsPath, "") - sourceNode := common.MapStr{ - "uuid": nodeInfo.ID, - "host": nodeInfo.Host, - "transport_address": nodeInfo.TransportAddress, - "ip": nodeInfo.IP, - "name": nodeInfo.Name, - "timestamp": common.Time(time.Now()), - } event := mb.Event{} event.RootFields = common.MapStr{} event.RootFields.Put("indices_stats._all", fields) @@ -105,10 +94,9 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, c event.RootFields.Put("timestamp", common.Time(time.Now())) event.RootFields.Put("interval_ms", m.Module().Config().Period/time.Millisecond) event.RootFields.Put("type", "indices_stats") - event.RootFields.Put("source_node", sourceNode) event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch) r.Event(event) - return errs + return nil } diff --git a/metricbeat/module/elasticsearch/index_summary/index_summary.go b/metricbeat/module/elasticsearch/index_summary/index_summary.go index c364216a09ec..0585d441847e 100644 --- a/metricbeat/module/elasticsearch/index_summary/index_summary.go +++ b/metricbeat/module/elasticsearch/index_summary/index_summary.go @@ -21,7 +21,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/elasticsearch" @@ -54,7 +54,7 @@ type MetricSet struct { // New create a new instance of the MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Beta("The " + base.FullyQualifiedName() + " metricset is beta") + cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta") // Get the stats from the local node ms, err := elasticsearch.NewMetricSet(base, statsPath) @@ -68,32 +68,38 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(r mb.ReporterV2) { isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+statsPath) if err != nil { - r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master")) + err = errors.Wrap(err, "error determining if connected Elasticsearch node is master") + elastic.ReportAndLogError(err, r, m.Log) return } // Not master, no event sent if !isMaster { - logp.Debug(elasticsearch.ModuleName, "Trying to fetch index summary stats from a non-master node.") + m.Log.Debug("trying to fetch index summary stats from a non-master node") return } content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+statsPath) if err != nil { - r.Error(errors.Wrap(err, "failed to get info from Elasticsearch")) + err = errors.Wrap(err, "failed to get info from Elasticsearch") + elastic.ReportAndLogError(err, r, m.Log) return } if m.XPack { - eventMappingXPack(r, m, *info, content) + err = eventMappingXPack(r, m, *info, content) } else { - eventMapping(r, *info, content) + err = eventMapping(r, *info, content) } + if err != nil { + m.Log.Error(err) + return + } } diff --git a/metricbeat/module/elasticsearch/metricset.go b/metricbeat/module/elasticsearch/metricset.go index 903de3695a29..07a49cfef0c3 100644 --- a/metricbeat/module/elasticsearch/metricset.go +++ b/metricbeat/module/elasticsearch/metricset.go @@ -66,14 +66,13 @@ func NewMetricSet(base mb.BaseMetricSet, subPath string) (*MetricSet, error) { } if config.XPack { - cfgwarn.Experimental("The experimental xpack.enabled flag in elasticsearch/node_stats metricset is enabled.") + cfgwarn.Experimental("The experimental xpack.enabled flag in " + base.FullyQualifiedName() + " metricset is enabled.") } - log := logp.NewLogger(ModuleName) return &MetricSet{ base, http, config.XPack, - log, + logp.NewLogger(ModuleName), }, nil } diff --git a/metricbeat/module/elasticsearch/ml_job/data.go b/metricbeat/module/elasticsearch/ml_job/data.go index ae4d6cecd6b9..df86ebfe89ba 100644 --- a/metricbeat/module/elasticsearch/ml_job/data.go +++ b/metricbeat/module/elasticsearch/ml_job/data.go @@ -60,13 +60,15 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { event := mb.Event{} + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", elasticsearch.ModuleName) + event.MetricSetFields, err = schema.Apply(job) if err != nil { - errs = append(errs, errors.Wrap(err, "failure applying ml job schema")) + event.Error = errors.Wrap(err, "failure applying ml job schema") + errs = append(errs, event.Error) } - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", elasticsearch.ModuleName) r.Event(event) } return errs.Err() diff --git a/metricbeat/module/elasticsearch/ml_job/data_xpack.go b/metricbeat/module/elasticsearch/ml_job/data_xpack.go index bb0ecfb45e06..894c87c35d8e 100644 --- a/metricbeat/module/elasticsearch/ml_job/data_xpack.go +++ b/metricbeat/module/elasticsearch/ml_job/data_xpack.go @@ -22,6 +22,7 @@ import ( "fmt" "time" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -52,9 +53,11 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { return fmt.Errorf("jobs is not an array of maps") } + var errs multierror.Errors for _, job := range jobsArr { job, ok = job.(map[string]interface{}) if !ok { + errs = append(errs, fmt.Errorf("job is not a map")) continue } @@ -71,5 +74,5 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { r.Event(event) } - return nil + return errs.Err() } diff --git a/metricbeat/module/elasticsearch/ml_job/ml_job.go b/metricbeat/module/elasticsearch/ml_job/ml_job.go index 5e04da8aab15..cc568747d9c3 100644 --- a/metricbeat/module/elasticsearch/ml_job/ml_job.go +++ b/metricbeat/module/elasticsearch/ml_job/ml_job.go @@ -21,7 +21,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) @@ -60,29 +60,31 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+jobPath) if err != nil { - r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master")) + err = errors.Wrap(err, "error determining if connected Elasticsearch node is master") + elastic.ReportAndLogError(err, r, m.Log) return } // Not master, no event sent if !isMaster { - logp.Debug(elasticsearch.ModuleName, "Trying to fetch machine learning job stats from a non-master node.") + m.Log.Debug("trying to fetch machine learning job stats from a non-master node") return } content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } if m.XPack { - eventsMappingXPack(r, m, content) + err = eventsMappingXPack(r, m, content) } else { err = eventsMapping(r, content) - if err != nil { - r.Error(err) - return - } + } + + if err != nil { + m.Log.Error(err) + return } } diff --git a/metricbeat/module/elasticsearch/node/data.go b/metricbeat/module/elasticsearch/node/data.go index d2d429a948bc..aa1991895774 100644 --- a/metricbeat/module/elasticsearch/node/data.go +++ b/metricbeat/module/elasticsearch/node/data.go @@ -77,20 +77,24 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { var errs multierror.Errors for name, node := range nodesStruct.Nodes { event := mb.Event{} + + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", elasticsearch.ModuleName) + + event.ModuleFields = common.MapStr{} + event.ModuleFields.Put("cluster.name", nodesStruct.ClusterName) + event.MetricSetFields, err = schema.Apply(node) if err != nil { - errs = append(errs, errors.Wrap(err, "failure applying node schema")) + event.Error = errors.Wrap(err, "failure applying node schema") + r.Event(event) + errs = append(errs, event.Error) + continue } // Write name here as full name only available as key event.MetricSetFields["name"] = name - event.ModuleFields = common.MapStr{} - event.ModuleFields.Put("cluster.name", nodesStruct.ClusterName) - - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", elasticsearch.ModuleName) - r.Event(event) } diff --git a/metricbeat/module/elasticsearch/node/node.go b/metricbeat/module/elasticsearch/node/node.go index 7377f6777e6c..2642b66ba285 100644 --- a/metricbeat/module/elasticsearch/node/node.go +++ b/metricbeat/module/elasticsearch/node/node.go @@ -19,7 +19,7 @@ package node import ( "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/elasticsearch" @@ -34,44 +34,47 @@ func init() { ) } +const ( + // This only fetches data for the local node. + nodeStatsPath = "/_nodes/_local" +) + var ( hostParser = parse.URLHostParserBuilder{ DefaultScheme: "http", PathConfigKey: "path", - // This only fetches data for the local node. - DefaultPath: "_nodes/_local", }.Build() ) // MetricSet type defines all fields of the MetricSet type MetricSet struct { - mb.BaseMetricSet - http *helper.HTTP + *elasticsearch.MetricSet } // New create a new instance of the MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Beta("The " + base.FullyQualifiedName() + " metricset is beta") + cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := elasticsearch.NewMetricSet(base, nodeStatsPath) if err != nil { return nil, err } - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{MetricSet: ms}, nil } // Fetch methods implements the data gathering and data conversion to the right format // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) { - content, err := m.http.FetchContent() + content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } - eventsMapping(r, content) + err = eventsMapping(r, content) + if err != nil { + elastic.ReportAndLogError(err, r, m.Log) + return + } } diff --git a/metricbeat/module/elasticsearch/node/node_test.go b/metricbeat/module/elasticsearch/node/node_test.go index be2243e91bbc..ec8c805253e6 100644 --- a/metricbeat/module/elasticsearch/node/node_test.go +++ b/metricbeat/module/elasticsearch/node/node_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/elasticsearch" ) func TestFetch(t *testing.T) { @@ -52,7 +53,7 @@ func TestFetch(t *testing.T) { defer server.Close() config := map[string]interface{}{ - "module": "elasticsearch", + "module": elasticsearch.ModuleName, "metricsets": []string{"node"}, "hosts": []string{server.URL}, } diff --git a/metricbeat/module/elasticsearch/node_stats/data.go b/metricbeat/module/elasticsearch/node_stats/data.go index f5a9f09b69ac..41423d7caa65 100644 --- a/metricbeat/module/elasticsearch/node_stats/data.go +++ b/metricbeat/module/elasticsearch/node_stats/data.go @@ -121,10 +121,8 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { for name, node := range nodeData.Nodes { event := mb.Event{} - event.MetricSetFields, err = schema.Apply(node) - if err != nil { - r.Error(errors.Wrap(err, "failure to apply node schema")) - } + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", elasticsearch.ModuleName) event.ModuleFields = common.MapStr{ "node": common.MapStr{ @@ -134,8 +132,14 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { "name": nodeData.ClusterName, }, } - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", elasticsearch.ModuleName) + + event.MetricSetFields, err = schema.Apply(node) + if err != nil { + event.Error = errors.Wrap(err, "failure to apply node schema") + r.Event(event) + errs = append(errs, event.Error) + } + r.Event(event) } return errs.Err() diff --git a/metricbeat/module/elasticsearch/node_stats/data_xpack.go b/metricbeat/module/elasticsearch/node_stats/data_xpack.go index df87b3f30b9f..6510fc3cfbfa 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_xpack.go +++ b/metricbeat/module/elasticsearch/node_stats/data_xpack.go @@ -22,25 +22,18 @@ import ( "time" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) var ( - sourceNodeXpack = s.Schema{ - "host": c.Str("host"), - "transport_address": c.Str("transport_address"), - "ip": c.Str("ip"), - "name": c.Str("name"), - } - schemaXpack = s.Schema{ "indices": c.Dict("indices", s.Schema{ "docs": c.Dict("docs", s.Schema{ @@ -186,29 +179,27 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { // it will provid the data for multiple nodes. This will mean the detection of the // master node will not be accurate anymore as often in these cases a proxy is in front // of ES and it's not know if the request will be routed to the same node as before. + var errs multierror.Errors for nodeID, node := range nodesStruct.Nodes { clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HTTP.GetURI(), nodeID) if err != nil { - logp.Err("could not fetch cluster id: %s", err) + errs = append(errs, errors.Wrap(err, "could not fetch cluster id")) continue } isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HTTP.GetURI()) if err != nil { - logp.Err("error determining if connected Elasticsearch node is master: %s", err) + errs = append(errs, errors.Wrap(err, "error determining if connected Elasticsearch node is master")) continue } + event := mb.Event{} - // Build source_node object - sourceNode, _ := sourceNodeXpack.Apply(node) - sourceNode["uuid"] = nodeID nodeData, err := schemaXpack.Apply(node) if err != nil { - logp.Err("failure to apply node schema: %s", err) + errs = append(errs, errors.Wrap(err, "failure to apply node schema")) continue } - nodeData["node_master"] = isMaster nodeData["node_id"] = nodeID @@ -217,12 +208,11 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { "cluster_uuid": clusterID, "interval_ms": m.Module().Config().Period.Nanoseconds() / 1000 / 1000, "type": "node_stats", - "source_node": sourceNode, "node_stats": nodeData, } event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch) r.Event(event) } - return nil + return errs.Err() } diff --git a/metricbeat/module/elasticsearch/node_stats/node_stats.go b/metricbeat/module/elasticsearch/node_stats/node_stats.go index c250cb596245..c355fe51fd9c 100644 --- a/metricbeat/module/elasticsearch/node_stats/node_stats.go +++ b/metricbeat/module/elasticsearch/node_stats/node_stats.go @@ -19,6 +19,7 @@ package node_stats import ( "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) @@ -58,13 +59,21 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(r mb.ReporterV2) { content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } - if m.MetricSet.XPack { - eventsMappingXPack(r, m, content) + if m.XPack { + err = eventsMappingXPack(r, m, content) + if err != nil { + m.Log.Error(err) + return + } } else { - eventsMapping(r, content) + err = eventsMapping(r, content) + if err != nil { + elastic.ReportAndLogError(err, r, m.Log) + return + } } } diff --git a/metricbeat/module/elasticsearch/pending_tasks/data.go b/metricbeat/module/elasticsearch/pending_tasks/data.go index ae114366e5fc..d35207db8843 100644 --- a/metricbeat/module/elasticsearch/pending_tasks/data.go +++ b/metricbeat/module/elasticsearch/pending_tasks/data.go @@ -59,13 +59,16 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { var errs multierror.Errors for _, task := range tasksStruct.Tasks { event := mb.Event{} + + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", elasticsearch.ModuleName) + event.MetricSetFields, err = schema.Apply(task) if err != nil { - errs = append(errs, errors.Wrap(err, "failure applying task schema")) + event.Error = errors.Wrap(err, "failure applying task schema") + errs = append(errs, event.Error) } - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", elasticsearch.ModuleName) r.Event(event) } diff --git a/metricbeat/module/elasticsearch/pending_tasks/pending_tasks.go b/metricbeat/module/elasticsearch/pending_tasks/pending_tasks.go index a3ad788e671f..07f354586ba1 100644 --- a/metricbeat/module/elasticsearch/pending_tasks/pending_tasks.go +++ b/metricbeat/module/elasticsearch/pending_tasks/pending_tasks.go @@ -21,25 +21,23 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) // init registers the MetricSet with the central registry. // The New method will be called after the setup of the module and before starting to fetch data func init() { - mb.Registry.AddMetricSet(elasticsearch.ModuleName, "pending_tasks", New, hostParser) + mb.Registry.MustAddMetricSet(elasticsearch.ModuleName, "pending_tasks", New, + mb.WithHostParser(elasticsearch.HostParser), + mb.DefaultMetricSet(), + mb.WithNamespace("elasticsearch.pending_tasks"), + ) } -var ( - hostParser = parse.URLHostParserBuilder{ - DefaultScheme: "http", - PathConfigKey: "path", - DefaultPath: "_cluster/pending_tasks", - }.Build() +const ( + pendingTasksPath = "/_cluster/pending_tasks" ) // MetricSet holds any configuration or state information. It must implement @@ -47,45 +45,46 @@ var ( // mb.BaseMetricSet because it implements all of the required mb.MetricSet // interface methods except for Fetch. type MetricSet struct { - mb.BaseMetricSet - http *helper.HTTP + *elasticsearch.MetricSet } // New creates a new instance of the MetricSet. New is responsible for unpacking // any MetricSet specific configuration options if there are any. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Beta("The " + base.FullyQualifiedName() + " metricset is beta.") + cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta") - http, err := helper.NewHTTP(base) + ms, err := elasticsearch.NewMetricSet(base, pendingTasksPath) if err != nil { return nil, err } - return &MetricSet{ - base, - http, - }, nil + return &MetricSet{MetricSet: ms}, nil } // Fetch methods implements the data gathering and data conversion to the right format func (m *MetricSet) Fetch(r mb.ReporterV2) { - isMaster, err := elasticsearch.IsMaster(m.http, m.HostData().SanitizedURI) + isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+pendingTasksPath) if err != nil { - r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master")) + err := errors.Wrap(err, "error determining if connected Elasticsearch node is master") + elastic.ReportAndLogError(err, r, m.Log) return } // Not master, no event sent if !isMaster { - logp.Debug(elasticsearch.ModuleName, "Trying to fetch pending tasks from a non-master node.") + m.Log.Debug("trying to fetch pending tasks from a non-master node") return } - content, err := m.http.FetchContent() + content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } - eventsMapping(r, content) + err = eventsMapping(r, content) + if err != nil { + m.Log.Error(err) + return + } } diff --git a/metricbeat/module/elasticsearch/shard/data.go b/metricbeat/module/elasticsearch/shard/data.go index d9c12438bab9..c8ff543833a6 100644 --- a/metricbeat/module/elasticsearch/shard/data.go +++ b/metricbeat/module/elasticsearch/shard/data.go @@ -20,6 +20,7 @@ package shard import ( "encoding/json" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -61,32 +62,45 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { return err } + var errs multierror.Errors for _, index := range stateData.RoutingTable.Indices { for _, shards := range index.Shards { for _, shard := range shards { event := mb.Event{} + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", elasticsearch.ModuleName) + + event.ModuleFields = common.MapStr{} + event.ModuleFields.Put("cluster.state.id", stateData.StateID) + event.ModuleFields.Put("cluster.name", stateData.ClusterName) + fields, err := schema.Apply(shard) if err != nil { - r.Error(errors.Wrap(err, "failure applying shard schema")) + event.Error = errors.Wrap(err, "failure applying shard schema") + r.Event(event) + errs = append(errs, event.Error) continue } // Handle node field: could be string or null err = elasticsearch.PassThruField("node", shard, fields) if err != nil { - r.Error(errors.Wrap(err, "failure passing through node field")) + event.Error = errors.Wrap(err, "failure passing through node field") + r.Event(event) + errs = append(errs, event.Error) continue } // Handle relocating_node field: could be string or null err = elasticsearch.PassThruField("relocating_node", shard, fields) if err != nil { - r.Error(errors.Wrap(err, "failure passing through relocating_node field")) + event.Error = errors.Wrap(err, "failure passing through relocating_node field") + r.Event(event) + errs = append(errs, event.Error) continue } - event.ModuleFields = common.MapStr{} event.ModuleFields.Put("node.name", fields["node"]) delete(fields, "node") @@ -100,15 +114,9 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { delete(event.MetricSetFields, "relocating_node") event.MetricSetFields.Put("relocating_node.name", fields["relocating_node"]) - event.ModuleFields.Put("cluster.state.id", stateData.StateID) - event.ModuleFields.Put("cluster.name", stateData.ClusterName) - - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", elasticsearch.ModuleName) - r.Event(event) } } } - return nil + return errs.Err() } diff --git a/metricbeat/module/elasticsearch/shard/data_xpack.go b/metricbeat/module/elasticsearch/shard/data_xpack.go index 57b6804c9ab5..bf7b443fd599 100644 --- a/metricbeat/module/elasticsearch/shard/data_xpack.go +++ b/metricbeat/module/elasticsearch/shard/data_xpack.go @@ -22,6 +22,7 @@ import ( "strconv" "time" + "github.com/joeshaw/multierror" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -44,24 +45,28 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { return errors.Wrap(err, "failed to get cluster ID from Elasticsearch") } + var errs multierror.Errors for _, index := range stateData.RoutingTable.Indices { for _, shards := range index.Shards { for _, shard := range shards { event := mb.Event{} fields, err := schema.Apply(shard) if err != nil { + errs = append(errs, errors.Wrap(err, "failure to apply shard schema")) continue } // Handle node field: could be string or null err = elasticsearch.PassThruField("node", shard, fields) if err != nil { + errs = append(errs, errors.Wrap(err, "failure passing through node field")) continue } // Handle relocating_node field: could be string or null err = elasticsearch.PassThruField("relocating_node", shard, fields) if err != nil { + errs = append(errs, errors.Wrap(err, "failure passing through relocating_node field")) continue } @@ -82,6 +87,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { if nodeID != nil { // shard has not been allocated yet sourceNode, err := getSourceNode(nodeID.(string), stateData) if err != nil { + errs = append(errs, errors.Wrap(err, "failure getting source node information")) continue } event.RootFields.Put("source_node", sourceNode) @@ -89,17 +95,16 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { event.ID, err = getEventID(stateData.StateID, fields) if err != nil { + errs = append(errs, errors.Wrap(err, "failure getting event ID")) continue } event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch) - r.Event(event) - } } } - return nil + return errs.Err() } func getSourceNode(nodeID string, stateData *stateStruct) (common.MapStr, error) { diff --git a/metricbeat/module/elasticsearch/shard/shard.go b/metricbeat/module/elasticsearch/shard/shard.go index 2b7ed012594f..6b96e42d6a1a 100644 --- a/metricbeat/module/elasticsearch/shard/shard.go +++ b/metricbeat/module/elasticsearch/shard/shard.go @@ -21,7 +21,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) @@ -45,7 +45,7 @@ type MetricSet struct { // New create a new instance of the MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Beta("The " + base.FullyQualifiedName() + " metricset is beta") + cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta") // Get the stats from the local node ms, err := elasticsearch.NewMetricSet(base, statePath) @@ -59,25 +59,31 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(r mb.ReporterV2) { isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+statePath) if err != nil { - r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master")) + err := errors.Wrap(err, "error determining if connected Elasticsearch node is master") + elastic.ReportAndLogError(err, r, m.Log) return } // Not master, no event sent if !isMaster { - logp.Debug(elasticsearch.ModuleName, "Trying to fetch shard stats from a non master node.") + m.Log.Debug("trying to fetch shard stats from a non-master node") return } content, err := m.HTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } if m.XPack { - eventsMappingXPack(r, m, content) + err = eventsMappingXPack(r, m, content) } else { - eventsMapping(r, content) + err = eventsMapping(r, content) + } + + if err != nil { + m.Log.Error(err) + return } } diff --git a/metricbeat/module/kibana/metricset.go b/metricbeat/module/kibana/metricset.go new file mode 100644 index 000000000000..031c96a929b3 --- /dev/null +++ b/metricbeat/module/kibana/metricset.go @@ -0,0 +1,55 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package kibana + +import ( + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" +) + +// MetricSet can be used to build other metricsets within the Kibana module. +type MetricSet struct { + mb.BaseMetricSet + XPackEnabled bool + Log *logp.Logger +} + +// NewMetricSet creates a metricset that can be used to build other metricsets +// within the Kibana module. +func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { + config := struct { + xPackEnabled bool `config:"xpack.enabled"` + }{ + xPackEnabled: false, + } + + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + if config.xPackEnabled { + cfgwarn.Experimental("The experimental xpack.enabled flag in the " + base.FullyQualifiedName() + " metricset is enabled.") + } + + return &MetricSet{ + base, + config.xPackEnabled, + logp.NewLogger(ModuleName), + }, nil +} diff --git a/metricbeat/module/kibana/stats/data.go b/metricbeat/module/kibana/stats/data.go index 35e01616b461..8f494e8596c9 100644 --- a/metricbeat/module/kibana/stats/data.go +++ b/metricbeat/module/kibana/stats/data.go @@ -93,7 +93,9 @@ func eventMapping(r mb.ReporterV2, content []byte) error { dataFields, err := schema.Apply(data) if err != nil { - r.Error(errors.Wrap(err, "failure to apply stats schema")) + err = errors.Wrap(err, "failure to apply stats schema") + r.Error(err) + return err } var event mb.Event @@ -103,18 +105,24 @@ func eventMapping(r mb.ReporterV2, content []byte) error { // Set elasticsearch cluster id elasticsearchClusterID, ok := data["cluster_uuid"] if !ok { - return elastic.ReportErrorForMissingField("cluster_uuid", elastic.Kibana, r) + event.Error = elastic.MakeErrorForMissingField("cluster_uuid", elastic.Kibana) + r.Event(event) + return event.Error } event.RootFields.Put("elasticsearch.cluster.id", elasticsearchClusterID) // Set process PID process, ok := data["process"].(map[string]interface{}) if !ok { - return elastic.ReportErrorForMissingField("process", elastic.Kibana, r) + event.Error = elastic.MakeErrorForMissingField("process", elastic.Kibana) + r.Event(event) + return event.Error } pid, ok := process["pid"].(float64) if !ok { - return elastic.ReportErrorForMissingField("process.pid", elastic.Kibana, r) + event.Error = elastic.MakeErrorForMissingField("process.pid", elastic.Kibana) + r.Event(event) + return event.Error } event.RootFields.Put("process.pid", int(pid)) diff --git a/metricbeat/module/kibana/stats/stats.go b/metricbeat/module/kibana/stats/stats.go index 103f5be8b72c..674be4e53372 100644 --- a/metricbeat/module/kibana/stats/stats.go +++ b/metricbeat/module/kibana/stats/stats.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/kibana" @@ -52,18 +53,17 @@ var ( // MetricSet type defines all fields of the MetricSet type MetricSet struct { - mb.BaseMetricSet + *kibana.MetricSet statsHTTP *helper.HTTP settingsHTTP *helper.HTTP - xPackEnabled bool } // New create a new instance of the MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The " + base.FullyQualifiedName() + " metricset is experimental") - config := kibana.DefaultConfig() - if err := base.Module().UnpackConfig(&config); err != nil { + ms, err := kibana.NewMetricSet(base) + if err != nil { return nil, err } @@ -87,16 +87,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, fmt.Errorf(errorMsg, base.FullyQualifiedName(), kibana.StatsAPIAvailableVersion, kibanaVersion) } - if config.XPackEnabled { - cfgwarn.Experimental("The experimental xpack.enabled flag in the " + base.FullyQualifiedName() + " metricset is enabled.") + if ms.XPackEnabled { + cfgwarn.Experimental("The experimental xpack.enabled flag in the " + ms.FullyQualifiedName() + " metricset is enabled.") // Use legacy API response so we can passthru usage as-is statsHTTP.SetURI(statsHTTP.GetURI() + "&legacy=true") } var settingsHTTP *helper.HTTP - if config.XPackEnabled { - cfgwarn.Experimental("The experimental xpack.enabled flag in the " + base.FullyQualifiedName() + " metricset is enabled.") + if ms.XPackEnabled { + cfgwarn.Experimental("The experimental xpack.enabled flag in the " + ms.FullyQualifiedName() + " metricset is enabled.") isSettingsAPIAvailable, err := kibana.IsSettingsAPIAvailable(kibanaVersion) if err != nil { @@ -105,7 +105,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if !isSettingsAPIAvailable { const errorMsg = "The %v metricset with X-Pack enabled is only supported with Kibana >= %v. You are currently running Kibana %v" - return nil, fmt.Errorf(errorMsg, base.FullyQualifiedName(), kibana.SettingsAPIAvailableVersion, kibanaVersion) + return nil, fmt.Errorf(errorMsg, ms.FullyQualifiedName(), kibana.SettingsAPIAvailableVersion, kibanaVersion) } settingsHTTP, err = helper.NewHTTP(base) @@ -120,10 +120,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } return &MetricSet{ - base, + ms, statsHTTP, settingsHTTP, - config.XPackEnabled, }, nil } @@ -134,7 +133,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { now := time.Now() m.fetchStats(r, now) - if m.xPackEnabled { + if m.XPackEnabled { m.fetchSettings(r, now) } } @@ -142,26 +141,39 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { func (m *MetricSet) fetchStats(r mb.ReporterV2, now time.Time) { content, err := m.statsHTTP.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } - if m.xPackEnabled { + if m.XPackEnabled { intervalMs := m.calculateIntervalMs() - eventMappingStatsXPack(r, intervalMs, now, content) + err = eventMappingStatsXPack(r, intervalMs, now, content) + if err != nil { + m.Log.Error(err) + return + } } else { - eventMapping(r, content) + err = eventMapping(r, content) + if err != nil { + elastic.ReportAndLogError(err, r, m.Log) + return + } } } func (m *MetricSet) fetchSettings(r mb.ReporterV2, now time.Time) { content, err := m.settingsHTTP.FetchContent() if err != nil { + m.Log.Error(err) return } intervalMs := m.calculateIntervalMs() - eventMappingSettingsXPack(r, intervalMs, now, content) + err = eventMappingSettingsXPack(r, intervalMs, now, content) + if err != nil { + m.Log.Error(err) + return + } } func (m *MetricSet) calculateIntervalMs() int64 { diff --git a/metricbeat/module/kibana/status/data.go b/metricbeat/module/kibana/status/data.go index 5e2e6eb46793..2c498488f2b2 100644 --- a/metricbeat/module/kibana/status/data.go +++ b/metricbeat/module/kibana/status/data.go @@ -51,28 +51,26 @@ var ( } ) -type OverallMetrics struct { - Metrics map[string][][]uint64 -} - func eventMapping(r mb.ReporterV2, content []byte) error { + var event mb.Event + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", kibana.ModuleName) + var data map[string]interface{} err := json.Unmarshal(content, &data) if err != nil { - err = errors.Wrap(err, "failure parsing Kibana Stats API response") - r.Error(err) - return err + event.Error = errors.Wrap(err, "failure parsing Kibana Status API response") + r.Event(event) + return event.Error } dataFields, err := schema.Apply(data) if err != nil { - r.Error(errors.Wrap(err, "failure to apply stats schema")) + event.Error = errors.Wrap(err, "failure to apply status schema") + r.Event(event) + return event.Error } - var event mb.Event - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", kibana.ModuleName) - event.MetricSetFields = dataFields r.Event(event) diff --git a/metricbeat/module/kibana/status/status.go b/metricbeat/module/kibana/status/status.go index d5a3be7442b4..4f5c00f2851a 100644 --- a/metricbeat/module/kibana/status/status.go +++ b/metricbeat/module/kibana/status/status.go @@ -20,6 +20,7 @@ package status import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/kibana" @@ -44,7 +45,7 @@ var ( // MetricSet type defines all fields of the MetricSet type MetricSet struct { - mb.BaseMetricSet + *kibana.MetricSet http *helper.HTTP } @@ -52,12 +53,18 @@ type MetricSet struct { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The " + base.FullyQualifiedName() + " metricset is beta") + ms, err := kibana.NewMetricSet(base) + if err != nil { + return nil, err + } + http, err := helper.NewHTTP(base) if err != nil { return nil, err } + return &MetricSet{ - base, + ms, http, }, nil } @@ -68,10 +75,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(r mb.ReporterV2) { content, err := m.http.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } - eventMapping(r, content) - return + err = eventMapping(r, content) + + if err != nil { + m.Log.Error(err) + return + } } diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index 5590ca03d645..e1a35d67caa9 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -17,5 +17,25 @@ package logstash +import ( + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/metricbeat/mb" +) + // ModuleName is the name of this module. const ModuleName = "logstash" + +// MetricSet can be used to build other metricsets within the Logstash module. +type MetricSet struct { + mb.BaseMetricSet + Log *logp.Logger +} + +// NewMetricSet creates a metricset that can be used to build other metricsets +// within the Logstash module. +func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) { + return &MetricSet{ + base, + logp.NewLogger(ModuleName), + }, nil +} diff --git a/metricbeat/module/logstash/node/data.go b/metricbeat/module/logstash/node/data.go index 999e27198b0f..e3493baa8269 100644 --- a/metricbeat/module/logstash/node/data.go +++ b/metricbeat/module/logstash/node/data.go @@ -41,23 +41,25 @@ var ( ) func eventMapping(r mb.ReporterV2, content []byte) error { + event := mb.Event{} + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", logstash.ModuleName) + var data map[string]interface{} err := json.Unmarshal(content, &data) if err != nil { - err = errors.Wrap(err, "failure parsing Logstash Node API response") - r.Error(err) - return err + event.Error = errors.Wrap(err, "failure parsing Logstash Node API response") + r.Event(event) + return event.Error } fields, err := schema.Apply(data) if err != nil { - r.Error(errors.Wrap(err, "failure applying node schema")) + event.Error = errors.Wrap(err, "failure applying node schema") + r.Event(event) + return event.Error } - event := mb.Event{} - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", logstash.ModuleName) - event.MetricSetFields = fields r.Event(event) diff --git a/metricbeat/module/logstash/node/data_test.go b/metricbeat/module/logstash/node/data_test.go index 85a191892d1c..10c77d39377a 100644 --- a/metricbeat/module/logstash/node/data_test.go +++ b/metricbeat/module/logstash/node/data_test.go @@ -35,11 +35,11 @@ func TestEventMapping(t *testing.T) { assert.NoError(t, err) for _, f := range files { - content, err := ioutil.ReadFile(f) + input, err := ioutil.ReadFile(f) assert.NoError(t, err) reporter := &mbtest.CapturingReporterV2{} - err = eventMapping(reporter, content) + err = eventMapping(reporter, input) assert.NoError(t, err, f) assert.True(t, len(reporter.GetEvents()) >= 1, f) diff --git a/metricbeat/module/logstash/node/node.go b/metricbeat/module/logstash/node/node.go index ccd7224ced7f..a841788d8749 100644 --- a/metricbeat/module/logstash/node/node.go +++ b/metricbeat/module/logstash/node/node.go @@ -20,6 +20,7 @@ package node import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/logstash" @@ -44,19 +45,26 @@ var ( // MetricSet type defines all fields of the MetricSet type MetricSet struct { - mb.BaseMetricSet + *logstash.MetricSet http *helper.HTTP } // New create a new instance of the MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Beta("The logstash node metricset is beta") + cfgwarn.Beta("The " + base.FullyQualifiedName() + " metricset is beta") + + ms, err := logstash.NewMetricSet(base) + if err != nil { + return nil, err + } + http, err := helper.NewHTTP(base) if err != nil { return nil, err } + return &MetricSet{ - base, + ms, http, }, nil } @@ -67,9 +75,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(r mb.ReporterV2) { content, err := m.http.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } - eventMapping(r, content) + err = eventMapping(r, content) + if err != nil { + m.Log.Error(err) + return + } } diff --git a/metricbeat/module/logstash/node/node_integration_test.go b/metricbeat/module/logstash/node/node_integration_test.go index 962e174bec0e..2cf56681fdc6 100644 --- a/metricbeat/module/logstash/node/node_integration_test.go +++ b/metricbeat/module/logstash/node/node_integration_test.go @@ -22,11 +22,11 @@ package node import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/libbeat/tests/compose" mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/elastic/beats/metricbeat/module/logstash" - - "github.com/stretchr/testify/assert" ) func TestFetch(t *testing.T) { diff --git a/metricbeat/module/logstash/node_stats/data.go b/metricbeat/module/logstash/node_stats/data.go index ab59bb6085eb..0603c35ab49f 100644 --- a/metricbeat/module/logstash/node_stats/data.go +++ b/metricbeat/module/logstash/node_stats/data.go @@ -40,23 +40,25 @@ var ( ) func eventMapping(r mb.ReporterV2, content []byte) error { + event := mb.Event{} + event.RootFields = common.MapStr{} + event.RootFields.Put("service.name", logstash.ModuleName) + var data map[string]interface{} err := json.Unmarshal(content, &data) if err != nil { - err = errors.Wrap(err, "failure parsing Logstash Node Stats API response") - r.Error(err) - return err + event.Error = errors.Wrap(err, "failure parsing Logstash Node Stats API response") + r.Event(event) + return event.Error } fields, err := schema.Apply(data) if err != nil { - r.Error(errors.Wrap(err, "failure applying node stats schema")) + event.Error = errors.Wrap(err, "failure applying node stats schema") + r.Event(event) + return event.Error } - event := mb.Event{} - event.RootFields = common.MapStr{} - event.RootFields.Put("service.name", logstash.ModuleName) - event.MetricSetFields = fields r.Event(event) diff --git a/metricbeat/module/logstash/node_stats/data_test.go b/metricbeat/module/logstash/node_stats/data_test.go index 7bfc4b9ef98d..e2c13beddcaa 100644 --- a/metricbeat/module/logstash/node_stats/data_test.go +++ b/metricbeat/module/logstash/node_stats/data_test.go @@ -35,11 +35,11 @@ func TestEventMapping(t *testing.T) { assert.NoError(t, err) for _, f := range files { - content, err := ioutil.ReadFile(f) + input, err := ioutil.ReadFile(f) assert.NoError(t, err) reporter := &mbtest.CapturingReporterV2{} - err = eventMapping(reporter, content) + err = eventMapping(reporter, input) assert.NoError(t, err, f) assert.True(t, len(reporter.GetEvents()) >= 1, f) diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 80ccc1b23505..e254d559ee90 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -20,22 +20,19 @@ package node_stats import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/helper" + "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/logstash" ) -const ( - metricsetName = "node_stats" - namespace = "logstash.node.stats" -) - // init registers the MetricSet with the central registry. // The New method will be called after the setup of the module and before starting to fetch data func init() { - mb.Registry.MustAddMetricSet(logstash.ModuleName, metricsetName, New, + mb.Registry.MustAddMetricSet(logstash.ModuleName, "node_stats", New, mb.WithHostParser(hostParser), - mb.WithNamespace(namespace), + mb.WithNamespace("logstash.node.stats"), mb.DefaultMetricSet(), ) } @@ -50,20 +47,25 @@ var ( // MetricSet type defines all fields of the MetricSet type MetricSet struct { - mb.BaseMetricSet + *logstash.MetricSet http *helper.HTTP } // New create a new instance of the MetricSet func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Beta("The logstash node_stats metricset is beta") + cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta") + + ms, err := logstash.NewMetricSet(base) + if err != nil { + return nil, err + } http, err := helper.NewHTTP(base) if err != nil { return nil, err } return &MetricSet{ - base, + ms, http, }, nil } @@ -74,9 +76,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (m *MetricSet) Fetch(r mb.ReporterV2) { content, err := m.http.FetchContent() if err != nil { - r.Error(err) + elastic.ReportAndLogError(err, r, m.Log) return } - eventMapping(r, content) + err = eventMapping(r, content) + if err != nil { + m.Log.Error(err) + return + } } diff --git a/metricbeat/module/logstash/node_stats/node_stats_integration_test.go b/metricbeat/module/logstash/node_stats/node_stats_integration_test.go index 7982c2fdf088..3e3839405d91 100644 --- a/metricbeat/module/logstash/node_stats/node_stats_integration_test.go +++ b/metricbeat/module/logstash/node_stats/node_stats_integration_test.go @@ -22,11 +22,11 @@ package node_stats import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/libbeat/tests/compose" mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/elastic/beats/metricbeat/module/logstash" - - "github.com/stretchr/testify/assert" ) func TestFetch(t *testing.T) { diff --git a/metricbeat/module/logstash/testing.go b/metricbeat/module/logstash/testing.go index 221b280bda04..0f76fada202f 100644 --- a/metricbeat/module/logstash/testing.go +++ b/metricbeat/module/logstash/testing.go @@ -42,7 +42,7 @@ func GetEnvPort() string { // GetConfig for Logstash func GetConfig(metricset string) map[string]interface{} { return map[string]interface{}{ - "module": "logstash", + "module": ModuleName, "metricsets": []string{metricset}, "hosts": []string{GetEnvHost() + ":" + GetEnvPort()}, }