Skip to content

Commit

Permalink
[Heartbeat] Fix broken data_stream assignment (#27535)
Browse files Browse the repository at this point in the history
This PR fixes the logic behind assigning the data_steam.dataset field.

Previously this was static per monitor type, only the index would change, but not the field. This makes the processor more comprehensive handling not just index naming but field generation.

This is a breaking change, setting event.dataset consistently to the monitor type rather than the old value of uptime. Since this field is unlikely to have been used by anyone the impact is low, and it will bring us inline with the ECS spec requiring this field to match event.dataset.

(cherry picked from commit 12df9f7)
  • Loading branch information
andrewvc authored and mergify-bot committed Aug 25, 2021
1 parent 0aed715 commit db248fa
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 109 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add option for S3 input to work without SQS notification {issue}18205[18205] {pull}27332[27332]

*Heartbeat*
- Remove long deprecated `watch_poll` functionality. {pull}27166[27166]
- Fix inconsistency in `event.dataset` values between heartbeat and fleet by always setting this value to the monitor type / fleet dataset. {pull}27535[27535]

*Journalbeat*

Expand Down
1 change: 1 addition & 0 deletions heartbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build !integration
// +build !integration

package config
1 change: 1 addition & 0 deletions heartbeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build mage
// +build mage

package main
Expand Down
86 changes: 46 additions & 40 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package monitors

import (
"fmt"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/scheduler"
Expand All @@ -26,7 +28,8 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index"
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)
Expand All @@ -52,10 +55,10 @@ type publishSettings struct {
KeepNull bool `config:"keep_null"`

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
DataStream *add_data_stream_index.DataStream `config:"data_stream"`
DataSet string `config:"dataset"`
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
DataStream *add_data_stream.DataStream `config:"data_stream"`
DataSet string `config:"dataset"`
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
Expand Down Expand Up @@ -91,12 +94,13 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi
return nil, err
}

stdFields, err := stdfields.ConfigToStdMonitorFields(cfg)
sf, err := stdfields.ConfigToStdMonitorFields(cfg)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not parse cfg for datastream %w", err)
}

indexProcessor, err := setupIndexProcessor(info, settings, stdFields.Type)
// Early stage processors for setting data_stream, event.dataset, and index to write to
preProcs, err := preProcessors(info, settings, sf.Type)
if err != nil {
return nil, err
}
Expand All @@ -106,40 +110,20 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi
return nil, err
}

// TODO: Remove this logic in the 8.0/master branch, preserve only in 7.x
dataset := settings.DataSet
if dataset == "" {
if settings.DataStream != nil && settings.DataStream.Dataset != "" {
dataset = settings.DataStream.Dataset
} else {
dataset = "uptime"
}
}

return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) {
fields := clientCfg.Processing.Fields.Clone()
fields.Put("event.dataset", dataset)

if settings.DataStream != nil {
fields.Put("data_stream", settings.DataStream)
}

meta := clientCfg.Processing.Meta.Clone()
if settings.Pipeline != "" {
meta.Put("pipeline", settings.Pipeline)
}

// assemble the processors. Ordering is important.
// 1. add support for index configuration via processor
// 2. add processors added by the input that wants to connect
// 3. add locally configured processors from the 'processors' settings
procs := processors.NewList(nil)
if indexProcessor != nil {
procs.AddProcessor(indexProcessor)
}

if lst := clientCfg.Processing.Processor; lst != nil {
procs.AddProcessor(lst)
}
procs.AddProcessors(*preProcs)
if userProcessors != nil {
procs.AddProcessors(*userProcessors)
}
Expand All @@ -155,28 +139,50 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi
}, nil
}

func setupIndexProcessor(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) {
var indexProcessor processors.Processor
// preProcessors sets up the required event.dataset, data_stream.*, and write index processors for future event publishes.
func preProcessors(info beat.Info, settings publishSettings, monitorType string) (procs *processors.Processors, err error) {
procs = processors.NewList(nil)

var dataset string
if settings.DataStream != nil && settings.DataStream.Dataset != "" {
dataset = settings.DataStream.Dataset
} else {
dataset = monitorType
}

// Always set event.dataset
procs.AddProcessor(actions.NewAddFields(common.MapStr{"event": common.MapStr{"dataset": dataset}}, true, true))

if settings.DataStream != nil {
ds := settings.DataStream
ds := *settings.DataStream
if ds.Type == "" {
ds.Type = "synthetics"
}
if ds.Dataset == "" {
ds.Dataset = dataset
}
return add_data_stream_index.New(*ds), nil

procs.AddProcessor(add_data_stream.New(ds))
}

if !settings.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version)

timestampFormat, err :=
fmtstr.NewTimestampFormatString(&settings.Index, staticFields)
proc, err := indexProcessor(&settings.Index, info)
if err != nil {
return nil, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
procs.AddProcessor(proc)
}

return procs, nil
}

func indexProcessor(index *fmtstr.EventFormatString, info beat.Info) (beat.Processor, error) {
staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version)

timestampFormat, err :=
fmtstr.NewTimestampFormatString(index, staticFields)
if err != nil {
return nil, err
}
return indexProcessor, nil
return add_formatted_index.New(timestampFormat), nil
}
70 changes: 56 additions & 14 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monitors

import (
"regexp"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -26,54 +27,67 @@ import (
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
)

func TestSetupIndexProcessor(t *testing.T) {
func TestPreProcessors(t *testing.T) {
binfo := beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}
tests := map[string]struct {
settings publishSettings
expectedIndex string
monitorType string
wantProc bool
wantErr bool
settings publishSettings
expectedIndex string
expectedDatastream *add_data_stream.DataStream
monitorType string
wantProc bool
wantErr bool
}{
"no settings should yield no processor": {
publishSettings{},
"",
nil,
"browser",
false,
false,
},
"exact index should be used exactly": {
publishSettings{Index: *fmtstr.MustCompileEvent("test")},
"test",
nil,
"browser",
true,
false,
},
"data stream should be type-namespace-dataset": {
publishSettings{
DataStream: &add_data_stream_index.DataStream{
DataStream: &add_data_stream.DataStream{
Namespace: "myNamespace",
Dataset: "myDataset",
Type: "myType",
},
},
"myType-myDataset-myNamespace",
&add_data_stream.DataStream{
Namespace: "myNamespace",
Dataset: "myDataset",
Type: "myType",
},
"myType",
true,
false,
},
"data stream should use defaults": {
publishSettings{
DataStream: &add_data_stream_index.DataStream{},
DataStream: &add_data_stream.DataStream{},
},
"synthetics-browser-default",
&add_data_stream.DataStream{
Namespace: "default",
Dataset: "browser",
Type: "synthetics",
},
"browser",
true,
false,
Expand All @@ -83,21 +97,49 @@ func TestSetupIndexProcessor(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
e := beat.Event{Meta: common.MapStr{}, Fields: common.MapStr{}}
proc, err := setupIndexProcessor(binfo, tt.settings, tt.monitorType)
procs, err := preProcessors(binfo, tt.settings, tt.monitorType)
if tt.wantErr == true {
require.Error(t, err)
return
}
require.NoError(t, err)

// If we're just setting event.dataset we only get the 1
// otherwise we get a second add_data_stream processor
if !tt.wantProc {
require.Nil(t, proc)
require.Len(t, procs.List, 1)
return
}
require.Len(t, procs.List, 2)

_, err = procs.Run(&e)

t.Run("index name should be set", func(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex])
})

eventDs, err := e.GetValue("event.dataset")
require.NoError(t, err)

t.Run("event.dataset should always be present, preferring data_stream", func(t *testing.T) {
dataset := tt.monitorType
if tt.settings.DataStream != nil && tt.settings.DataStream.Dataset != "" {
dataset = tt.settings.DataStream.Dataset
}
require.Equal(t, dataset, eventDs, "event.dataset be computed correctly")
require.Regexp(t, regexp.MustCompile(`^.+`), eventDs, "should be a string > 1 char")
})

t.Run("event.data_stream", func(t *testing.T) {
dataStreamRaw, _ := e.GetValue("data_stream")
if tt.expectedDatastream != nil {
dataStream := dataStreamRaw.(add_data_stream.DataStream)
require.Equal(t, eventDs, dataStream.Dataset, "event.dataset be identical to data_stream.dataset")

require.NotNil(t, proc)
_, err = proc.Run(&e)
require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex])
require.Equal(t, *tt.expectedDatastream, dataStream)
}
})
})
}
}
2 changes: 1 addition & 1 deletion heartbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,6 @@ def test_dataset(self):
for output in self.read_output():
self.assertEqual(
output["event.dataset"],
"uptime",
output["monitor.type"],
"Check for event.dataset in {} failed".format(output)
)
Loading

0 comments on commit db248fa

Please sign in to comment.