Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Fix broken data_stream assignment #27535

Merged
merged 16 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Heartbeat*
- Remove long deprecated `watch_poll` functionality. {pull}27166[27166]
- Fix inconsistency in `event.dataset` values between heartbeat and fleet by always setting this value to the monitor type / fleet dataset. {pull}27535[27535]

*Journalbeat*

Expand Down
1 change: 1 addition & 0 deletions heartbeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build !integration
// +build !integration

package config
1 change: 1 addition & 0 deletions heartbeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build mage
// +build mage

package main
Expand Down
86 changes: 46 additions & 40 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package monitors

import (
"fmt"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/scheduler"
Expand All @@ -26,7 +28,8 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index"
"github.com/elastic/beats/v7/libbeat/processors/actions"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)
Expand All @@ -51,10 +54,10 @@ type publishSettings struct {
KeepNull bool `config:"keep_null"`

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
DataStream *add_data_stream_index.DataStream `config:"data_stream"`
DataSet string `config:"dataset"`
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
DataStream *add_data_stream.DataStream `config:"data_stream"`
DataSet string `config:"dataset"`
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
Expand Down Expand Up @@ -90,12 +93,13 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi
return nil, err
}

stdFields, err := stdfields.ConfigToStdMonitorFields(cfg)
sf, err := stdfields.ConfigToStdMonitorFields(cfg)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not parse cfg for datastream %w", err)
}

indexProcessor, err := setupIndexProcessor(info, settings, stdFields.Type)
// Early stage processors for setting data_stream, event.dataset, and index to write to
preProcs, err := preProcessors(info, settings, sf.Type)
if err != nil {
return nil, err
}
Expand All @@ -105,40 +109,20 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi
return nil, err
}

// TODO: Remove this logic in the 8.0/master branch, preserve only in 7.x
dataset := settings.DataSet
if dataset == "" {
if settings.DataStream != nil && settings.DataStream.Dataset != "" {
dataset = settings.DataStream.Dataset
} else {
dataset = "uptime"
}
}

return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) {
fields := clientCfg.Processing.Fields.Clone()
fields.Put("event.dataset", dataset)

if settings.DataStream != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think i added this as a fix for Heartbeat not recognizing the data stream when used inside agent - #26774.

I hope it still works without this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved the logic to the add_data_stream processor. So, it's still running.

fields.Put("data_stream", settings.DataStream)
}

meta := clientCfg.Processing.Meta.Clone()
if settings.Pipeline != "" {
meta.Put("pipeline", settings.Pipeline)
}

// assemble the processors. Ordering is important.
// 1. add support for index configuration via processor
// 2. add processors added by the input that wants to connect
// 3. add locally configured processors from the 'processors' settings
procs := processors.NewList(nil)
if indexProcessor != nil {
procs.AddProcessor(indexProcessor)
}

if lst := clientCfg.Processing.Processor; lst != nil {
procs.AddProcessor(lst)
}
procs.AddProcessors(*preProcs)
if userProcessors != nil {
procs.AddProcessors(*userProcessors)
}
Expand All @@ -154,28 +138,50 @@ func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.Confi
}, nil
}

func setupIndexProcessor(info beat.Info, settings publishSettings, dataset string) (processors.Processor, error) {
var indexProcessor processors.Processor
// preProcessors sets up the required event.dataset, data_stream.*, and write index processors for future event publishes.
func preProcessors(info beat.Info, settings publishSettings, monitorType string) (procs *processors.Processors, err error) {
procs = processors.NewList(nil)

var dataset string
if settings.DataStream != nil && settings.DataStream.Dataset != "" {
dataset = settings.DataStream.Dataset
} else {
dataset = monitorType
}

// Always set event.dataset
procs.AddProcessor(actions.NewAddFields(common.MapStr{"event": common.MapStr{"dataset": dataset}}, true, true))

if settings.DataStream != nil {
ds := settings.DataStream
ds := *settings.DataStream
if ds.Type == "" {
ds.Type = "synthetics"
}
if ds.Dataset == "" {
ds.Dataset = dataset
}
return add_data_stream_index.New(*ds), nil

procs.AddProcessor(add_data_stream.New(ds))
}

if !settings.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version)

timestampFormat, err :=
fmtstr.NewTimestampFormatString(&settings.Index, staticFields)
proc, err := indexProcessor(&settings.Index, info)
if err != nil {
return nil, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
procs.AddProcessor(proc)
}

return procs, nil
}

func indexProcessor(index *fmtstr.EventFormatString, info beat.Info) (beat.Processor, error) {
staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version)

timestampFormat, err :=
fmtstr.NewTimestampFormatString(index, staticFields)
if err != nil {
return nil, err
}
return indexProcessor, nil
return add_formatted_index.New(timestampFormat), nil
}
70 changes: 56 additions & 14 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monitors

import (
"regexp"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -26,54 +27,67 @@ import (
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream_index"
"github.com/elastic/beats/v7/libbeat/processors/add_data_stream"
)

func TestSetupIndexProcessor(t *testing.T) {
func TestPreProcessors(t *testing.T) {
binfo := beat.Info{
Beat: "heartbeat",
IndexPrefix: "heartbeat",
Version: "8.0.0",
}
tests := map[string]struct {
settings publishSettings
expectedIndex string
monitorType string
wantProc bool
wantErr bool
settings publishSettings
expectedIndex string
expectedDatastream *add_data_stream.DataStream
monitorType string
wantProc bool
wantErr bool
}{
"no settings should yield no processor": {
publishSettings{},
"",
nil,
"browser",
false,
false,
},
"exact index should be used exactly": {
publishSettings{Index: *fmtstr.MustCompileEvent("test")},
"test",
nil,
"browser",
true,
false,
},
"data stream should be type-namespace-dataset": {
publishSettings{
DataStream: &add_data_stream_index.DataStream{
DataStream: &add_data_stream.DataStream{
Namespace: "myNamespace",
Dataset: "myDataset",
Type: "myType",
},
},
"myType-myDataset-myNamespace",
&add_data_stream.DataStream{
Namespace: "myNamespace",
Dataset: "myDataset",
Type: "myType",
},
"myType",
true,
false,
},
"data stream should use defaults": {
publishSettings{
DataStream: &add_data_stream_index.DataStream{},
DataStream: &add_data_stream.DataStream{},
},
"synthetics-browser-default",
&add_data_stream.DataStream{
Namespace: "default",
Dataset: "browser",
Type: "synthetics",
},
"browser",
true,
false,
Expand All @@ -83,21 +97,49 @@ func TestSetupIndexProcessor(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
e := beat.Event{Meta: common.MapStr{}, Fields: common.MapStr{}}
proc, err := setupIndexProcessor(binfo, tt.settings, tt.monitorType)
procs, err := preProcessors(binfo, tt.settings, tt.monitorType)
if tt.wantErr == true {
require.Error(t, err)
return
}
require.NoError(t, err)

// If we're just setting event.dataset we only get the 1
// otherwise we get a second add_data_stream processor
if !tt.wantProc {
require.Nil(t, proc)
require.Len(t, procs.List, 1)
return
}
require.Len(t, procs.List, 2)

_, err = procs.Run(&e)

t.Run("index name should be set", func(t *testing.T) {
require.NoError(t, err)
require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex])
})

eventDs, err := e.GetValue("event.dataset")
require.NoError(t, err)

t.Run("event.dataset should always be present, preferring data_stream", func(t *testing.T) {
dataset := tt.monitorType
if tt.settings.DataStream != nil && tt.settings.DataStream.Dataset != "" {
dataset = tt.settings.DataStream.Dataset
}
require.Equal(t, dataset, eventDs, "event.dataset be computed correctly")
require.Regexp(t, regexp.MustCompile(`^.+`), eventDs, "should be a string > 1 char")
})

t.Run("event.data_stream", func(t *testing.T) {
dataStreamRaw, _ := e.GetValue("data_stream")
if tt.expectedDatastream != nil {
dataStream := dataStreamRaw.(add_data_stream.DataStream)
require.Equal(t, eventDs, dataStream.Dataset, "event.dataset be identical to data_stream.dataset")

require.NotNil(t, proc)
_, err = proc.Run(&e)
require.Equal(t, tt.expectedIndex, e.Meta[events.FieldMetaRawIndex])
require.Equal(t, *tt.expectedDatastream, dataStream)
}
})
})
}
}
2 changes: 1 addition & 1 deletion heartbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,6 @@ def test_dataset(self):
for output in self.read_output():
self.assertEqual(
output["event.dataset"],
"uptime",
output["monitor.type"],
"Check for event.dataset in {} failed".format(output)
)
Loading