Skip to content

Commit

Permalink
Better error messaging for logstash/node_stats metricset
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Oct 4, 2018
1 parent 8504eda commit 357f9e6
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 19 deletions.
1 change: 1 addition & 0 deletions metricbeat/module/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package logstash

// ModuleName is the name of this module.
Expand Down
25 changes: 22 additions & 3 deletions metricbeat/module/logstash/node_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ package node_stats
import (
"encoding/json"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/logstash"
)

var (
Expand All @@ -35,11 +39,26 @@ var (
}
)

func eventMapping(content []byte) (common.MapStr, error) {
func eventMapping(r mb.ReporterV2, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return nil, err
err = errors.Wrap(err, "failure parsing Logstash Node Stats API response")
r.Error(err)
return err
}

fields, err := schema.Apply(data)
if err != nil {
r.Error(errors.Wrap(err, "failure applying node stats schema"))
}
return schema.Apply(data)

event := mb.Event{}
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", logstash.ModuleName)

event.MetricSetFields = fields

r.Event(event)
return nil
}
8 changes: 6 additions & 2 deletions metricbeat/module/logstash/node_stats/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"path/filepath"
"testing"

mbtest "github.com/elastic/beats/metricbeat/mb/testing"

"github.com/stretchr/testify/assert"
)

Expand All @@ -36,9 +38,11 @@ func TestEventMapping(t *testing.T) {
content, err := ioutil.ReadFile(f)
assert.NoError(t, err)

event, err := eventMapping(content)
reporter := &mbtest.CapturingReporterV2{}
err = eventMapping(reporter, content)

assert.NoError(t, err, f)
assert.True(t, len(event) >= 1, f)
assert.True(t, len(reporter.GetEvents()) >= 1, f)
assert.Equal(t, 0, len(reporter.GetErrors()), f)
}
}
13 changes: 6 additions & 7 deletions metricbeat/module/logstash/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@
package node_stats

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/logstash"
)

const (
moduleName = "logstash"
metricsetName = "node_stats"
namespace = "logstash.node.stats"
)

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
mb.Registry.MustAddMetricSet(moduleName, metricsetName, New,
mb.Registry.MustAddMetricSet(logstash.ModuleName, metricsetName, New,
mb.WithHostParser(hostParser),
mb.WithNamespace(namespace),
mb.DefaultMetricSet(),
Expand Down Expand Up @@ -72,12 +71,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right format
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch() (common.MapStr, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
content, err := m.http.FetchContent()
if err != nil {
return nil, err
r.Error(err)
return
}

event, _ := eventMapping(content)
return event, nil
eventMapping(r, content)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,24 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "logstash")

f := mbtest.NewEventFetcher(t, logstash.GetConfig("node_stats"))
event, err := f.Fetch()
if !assert.NoError(t, err) {
f := mbtest.NewReportingMetricSetV2(t, logstash.GetConfig("node_stats"))
events, errs := mbtest.ReportingFetchV2(f)

assert.Empty(t, errs)
if !assert.NotEmpty(t, events) {
t.FailNow()
}

assert.NotNil(t, event)
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(),
events[0].BeatEvent("logstash", "node_stats").Fields.StringToPrint())
}

func TestData(t *testing.T) {
compose.EnsureUp(t, "logstash")

f := mbtest.NewEventFetcher(t, logstash.GetConfig("node_stats"))
err := mbtest.WriteEvent(f, t)
config := logstash.GetConfig("node_stats")
f := mbtest.NewReportingMetricSetV2(t, config)
err := mbtest.WriteEventsReporterV2(f, t, "")
if err != nil {
t.Fatal("write", err)
}
Expand Down

0 comments on commit 357f9e6

Please sign in to comment.