diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 31aae20d241e..6da32b758144 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -615,6 +615,7 @@ field. You can revert this change by configuring tags for the module and omittin - Allow a list of status codes for HTTP checks. {pull}15587[15587] - Add additional ECS compatible fields for TLS information. {pull}17687[17687] - Record HTTP response headers. {pull}18327[18327] +- Add index and pipeline settings to monitor configurations. {pull}20610[20610] *Heartbeat* diff --git a/heartbeat/_meta/config/beat.reference.yml.tmpl b/heartbeat/_meta/config/beat.reference.yml.tmpl index 1b4cd61937af..e72d7fd7ae16 100644 --- a/heartbeat/_meta/config/beat.reference.yml.tmpl +++ b/heartbeat/_meta/config/beat.reference.yml.tmpl @@ -80,6 +80,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -169,6 +177,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -261,6 +277,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 15185c707deb..26b1202850da 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -72,7 +72,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { config: parsedConfig, scheduler: scheduler, // dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload - dynamicFactory: monitors.NewFactory(scheduler, false), + dynamicFactory: monitors.NewFactory(b.Info, scheduler, false), } return bt, nil } @@ -123,7 +123,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { // RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present. func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error { - factory := monitors.NewFactory(bt.scheduler, true) + factory := monitors.NewFactory(b.Info, bt.scheduler, true) for _, cfg := range bt.config.Monitors { created, err := factory.Create(b.Publisher, cfg) diff --git a/heartbeat/docs/monitors/monitor-common-options.asciidoc b/heartbeat/docs/monitors/monitor-common-options.asciidoc index ac1e0b27a1cb..53a7e1242816 100644 --- a/heartbeat/docs/monitors/monitor-common-options.asciidoc +++ b/heartbeat/docs/monitors/monitor-common-options.asciidoc @@ -143,6 +143,30 @@ A list of processors to apply to the data generated by the monitor. See <> for information about specifying processors in your config. +[float] +[[monitor-pipeline]] +===== `pipeline` + +The Ingest Node pipeline ID to set for the events generated by this input. + +NOTE: The pipeline ID can also be configured in the Elasticsearch output, but +this option usually results in simpler configuration files. If the pipeline is +configured both in the input and output, the option from the +input is used. + +[float] +[[monitor-index]] +===== `index` + +If present, this formatted string overrides the index for events from this input +(for elasticsearch outputs), or sets the `raw_index` field of the event's +metadata (for other outputs). This string can only refer to the agent name and +version and the event timestamp; for access to dynamic fields, use +`output.elasticsearch.index` or a processor. + +Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might +expand to `"heartbeat-myindex-2019.11.01"`. + [float] [[monitor-keep-null]] ==== `keep_null` diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index a0441a4f5dcc..4256c36dbf60 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -80,6 +80,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -169,6 +177,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -261,6 +277,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index e453bc3a03ac..10d039d0830f 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -22,22 +22,52 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/fmtstr" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) // RunnerFactory that can be used to create cfg.Runner cast versions of Monitor // suitable for config reloading. type RunnerFactory struct { + info beat.Info sched *scheduler.Scheduler allowWatches bool } +type publishSettings struct { + // Fields and tags to add to monitor. + EventMetadata common.EventMetadata `config:",inline"` + Processors processors.PluginConfig `config:"processors"` + + PublisherPipeline struct { + DisableHost bool `config:"disable_host"` // Disable addition of host.name. + } `config:"publisher_pipeline"` + + // KeepNull determines whether published events will keep null values or omit them. + KeepNull bool `config:"keep_null"` + + // Output meta data settings + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern + DataSet string `config:"dataset"` +} + // NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects. -func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { - return &RunnerFactory{sched, allowWatches} +func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { + return &RunnerFactory{info, sched, allowWatches} } // Create makes a new Runner for a new monitor with the given Config. -func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) { +func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runner, error) { + configEditor, err := newCommonPublishConfigs(f.info, c) + if err != nil { + return nil, err + } + + p = pipetool.WithClientConfigEdit(p, configEditor) monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches) return monitor, err } @@ -46,3 +76,67 @@ func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgf func (f *RunnerFactory) CheckConfig(config *common.Config) error { return checkMonitorConfig(config, globalPluginsReg, f.allowWatches) } + +func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.ConfigEditor, error) { + var settings publishSettings + if err := cfg.Unpack(&settings); err != nil { + return nil, err + } + + var indexProcessor processors.Processor + if !settings.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&settings.Index, staticFields) + if err != nil { + return nil, err + } + indexProcessor = add_formatted_index.New(timestampFormat) + } + + userProcessors, err := processors.New(settings.Processors) + if err != nil { + return nil, err + } + + dataset := settings.DataSet + if dataset == "" { + dataset = "uptime" + } + + return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { + logp.Info("Client connection with: %#v", clientCfg) + + fields := clientCfg.Processing.Fields.Clone() + fields.Put("event.dataset", dataset) + + meta := clientCfg.Processing.Meta.Clone() + if settings.Pipeline != "" { + meta.Put("pipeline", settings.Pipeline) + } + + // assemble the processors. Ordering is important. + // 1. add support for index configuration via processor + // 2. add processors added by the input that wants to connect + // 3. add locally configured processors from the 'processors' settings + procs := processors.NewList(nil) + if indexProcessor != nil { + procs.AddProcessor(indexProcessor) + } + if lst := clientCfg.Processing.Processor; lst != nil { + procs.AddProcessor(lst) + } + if userProcessors != nil { + procs.AddProcessors(*userProcessors) + } + + clientCfg.Processing.EventMetadata = settings.EventMetadata + clientCfg.Processing.Fields = fields + clientCfg.Processing.Meta = meta + clientCfg.Processing.Processor = procs + clientCfg.Processing.KeepNull = settings.KeepNull + clientCfg.Processing.DisableHost = settings.PublisherPipeline.DisableHost + + return clientCfg, nil + }, nil +} diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 0853b5547183..eb53ddeb1953 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -21,8 +21,6 @@ import ( "context" "fmt" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/heartbeat/eventext" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/scheduler" @@ -30,39 +28,24 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/processors" ) // configuredJob represents a job combined with its config and any // subsequent processors. type configuredJob struct { - job jobs.Job - config jobConfig - monitor *Monitor - processors *processors.Processors - cancelFn context.CancelFunc - client beat.Client + job jobs.Job + config jobConfig + monitor *Monitor + cancelFn context.CancelFunc + client beat.Client } func newConfiguredJob(job jobs.Job, config jobConfig, monitor *Monitor) (*configuredJob, error) { - t := &configuredJob{ + return &configuredJob{ job: job, config: config, monitor: monitor, - } - - processors, err := processors.New(config.Processors) - if err != nil { - return nil, ProcessorsError{err} - } - t.processors = processors - - if err != nil { - logp.Critical("Could not create client for monitor configuredJob %+v", t.monitor) - return nil, errors.Wrap(err, "could not create client for monitor configuredJob") - } - - return t, nil + }, nil } // jobConfig represents fields needed to execute a single job. @@ -70,13 +53,6 @@ type jobConfig struct { Name string `config:"pluginName"` Type string `config:"type"` Schedule *schedule.Schedule `config:"schedule" validate:"required"` - - // Fields and tags to add to monitor. - EventMetadata common.EventMetadata `config:",inline"` - Processors processors.PluginConfig `config:"processors"` - - // KeepNull determines whether published events will keep null values or omit them. - KeepNull bool `config:"keep_null"` } // ProcessorsError is used to indicate situations when processors could not be loaded. @@ -101,15 +77,7 @@ func (t *configuredJob) makeSchedulerTaskFunc() scheduler.TaskFunc { func (t *configuredJob) Start() { var err error - fields := common.MapStr{"event": common.MapStr{"dataset": "uptime"}} - t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - EventMetadata: t.config.EventMetadata, - Processor: t.processors, - KeepNull: t.config.KeepNull, - Fields: fields, - }, - }) + t.client, err = t.monitor.pipelineConnector.Connect() if err != nil { logp.Err("could not start monitor: %v", err) return diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index a0441a4f5dcc..4256c36dbf60 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -80,6 +80,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -169,6 +177,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false @@ -261,6 +277,14 @@ heartbeat.monitors: # Interval between file file changed checks. #interval: 5s + # The Ingest Node pipeline ID associated with this input. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + + # The index name associated with this input. If this is set, it + # overwrites the index option from the Elasticsearch output. + #index: + # Set to true to publish fields with null values in events. #keep_null: false