Skip to content

Commit

Permalink
Heartbeat: move configuration of common input setting to the runner f…
Browse files Browse the repository at this point in the history
…actory (elastic#20610)
  • Loading branch information
Steffen Siering authored Aug 20, 2020
1 parent 7de72d6 commit 61eaf3b
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Allow a list of status codes for HTTP checks. {pull}15587[15587]
- Add additional ECS compatible fields for TLS information. {pull}17687[17687]
- Record HTTP response headers. {pull}18327[18327]
- Add index and pipeline settings to monitor configurations. {pull}20610[20610]

*Journalbeat*

Expand Down
24 changes: 24 additions & 0 deletions heartbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down Expand Up @@ -169,6 +177,14 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down Expand Up @@ -261,6 +277,14 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down
4 changes: 2 additions & 2 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
config: parsedConfig,
scheduler: scheduler,
// dynamicFactory is the factory used for dynamic configs, e.g. autodiscover / reload
dynamicFactory: monitors.NewFactory(scheduler, false),
dynamicFactory: monitors.NewFactory(b.Info, scheduler, false),
}
return bt, nil
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error {
factory := monitors.NewFactory(bt.scheduler, true)
factory := monitors.NewFactory(b.Info, bt.scheduler, true)

for _, cfg := range bt.config.Monitors {
created, err := factory.Create(b.Publisher, cfg)
Expand Down
24 changes: 24 additions & 0 deletions heartbeat/docs/monitors/monitor-common-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,30 @@ A list of processors to apply to the data generated by the monitor.
See <<filtering-and-enhancing-data>> for information about specifying
processors in your config.

[float]
[[monitor-pipeline]]
===== `pipeline`

The Ingest Node pipeline ID to set for the events generated by this input.

NOTE: The pipeline ID can also be configured in the Elasticsearch output, but
this option usually results in simpler configuration files. If the pipeline is
configured both in the input and output, the option from the
input is used.

[float]
[[monitor-index]]
===== `index`

If present, this formatted string overrides the index for events from this input
(for elasticsearch outputs), or sets the `raw_index` field of the event's
metadata (for other outputs). This string can only refer to the agent name and
version and the event timestamp; for access to dynamic fields, use
`output.elasticsearch.index` or a processor.

Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might
expand to `"heartbeat-myindex-2019.11.01"`.

[float]
[[monitor-keep-null]]
==== `keep_null`
Expand Down
24 changes: 24 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down Expand Up @@ -169,6 +177,14 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down Expand Up @@ -261,6 +277,14 @@ heartbeat.monitors:
# Interval between file file changed checks.
#interval: 5s

# The Ingest Node pipeline ID associated with this input. If this is set, it
# overwrites the pipeline option from the Elasticsearch output.
#pipeline:

# The index name associated with this input. If this is set, it
# overwrites the index option from the Elasticsearch output.
#index:

# Set to true to publish fields with null values in events.
#keep_null: false

Expand Down
100 changes: 97 additions & 3 deletions heartbeat/monitors/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,52 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)

// RunnerFactory that can be used to create cfg.Runner cast versions of Monitor
// suitable for config reloading.
type RunnerFactory struct {
info beat.Info
sched *scheduler.Scheduler
allowWatches bool
}

type publishSettings struct {
// Fields and tags to add to monitor.
EventMetadata common.EventMetadata `config:",inline"`
Processors processors.PluginConfig `config:"processors"`

PublisherPipeline struct {
DisableHost bool `config:"disable_host"` // Disable addition of host.name.
} `config:"publisher_pipeline"`

// KeepNull determines whether published events will keep null values or omit them.
KeepNull bool `config:"keep_null"`

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
DataSet string `config:"dataset"`
}

// NewFactory takes a scheduler and creates a RunnerFactory that can create cfgfile.Runner(Monitor) objects.
func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
return &RunnerFactory{sched, allowWatches}
func NewFactory(info beat.Info, sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory {
return &RunnerFactory{info, sched, allowWatches}
}

// Create makes a new Runner for a new monitor with the given Config.
func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) {
func (f *RunnerFactory) Create(p beat.Pipeline, c *common.Config) (cfgfile.Runner, error) {
configEditor, err := newCommonPublishConfigs(f.info, c)
if err != nil {
return nil, err
}

p = pipetool.WithClientConfigEdit(p, configEditor)
monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches)
return monitor, err
}
Expand All @@ -46,3 +76,67 @@ func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgf
func (f *RunnerFactory) CheckConfig(config *common.Config) error {
return checkMonitorConfig(config, globalPluginsReg, f.allowWatches)
}

func newCommonPublishConfigs(info beat.Info, cfg *common.Config) (pipetool.ConfigEditor, error) {
var settings publishSettings
if err := cfg.Unpack(&settings); err != nil {
return nil, err
}

var indexProcessor processors.Processor
if !settings.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(info.Beat, info.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&settings.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
}

userProcessors, err := processors.New(settings.Processors)
if err != nil {
return nil, err
}

dataset := settings.DataSet
if dataset == "" {
dataset = "uptime"
}

return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) {
logp.Info("Client connection with: %#v", clientCfg)

fields := clientCfg.Processing.Fields.Clone()
fields.Put("event.dataset", dataset)

meta := clientCfg.Processing.Meta.Clone()
if settings.Pipeline != "" {
meta.Put("pipeline", settings.Pipeline)
}

// assemble the processors. Ordering is important.
// 1. add support for index configuration via processor
// 2. add processors added by the input that wants to connect
// 3. add locally configured processors from the 'processors' settings
procs := processors.NewList(nil)
if indexProcessor != nil {
procs.AddProcessor(indexProcessor)
}
if lst := clientCfg.Processing.Processor; lst != nil {
procs.AddProcessor(lst)
}
if userProcessors != nil {
procs.AddProcessors(*userProcessors)
}

clientCfg.Processing.EventMetadata = settings.EventMetadata
clientCfg.Processing.Fields = fields
clientCfg.Processing.Meta = meta
clientCfg.Processing.Processor = procs
clientCfg.Processing.KeepNull = settings.KeepNull
clientCfg.Processing.DisableHost = settings.PublisherPipeline.DisableHost

return clientCfg, nil
}, nil
}
48 changes: 8 additions & 40 deletions heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,62 +21,38 @@ import (
"context"
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/heartbeat/scheduler/schedule"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
)

// configuredJob represents a job combined with its config and any
// subsequent processors.
type configuredJob struct {
job jobs.Job
config jobConfig
monitor *Monitor
processors *processors.Processors
cancelFn context.CancelFunc
client beat.Client
job jobs.Job
config jobConfig
monitor *Monitor
cancelFn context.CancelFunc
client beat.Client
}

func newConfiguredJob(job jobs.Job, config jobConfig, monitor *Monitor) (*configuredJob, error) {
t := &configuredJob{
return &configuredJob{
job: job,
config: config,
monitor: monitor,
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, ProcessorsError{err}
}
t.processors = processors

if err != nil {
logp.Critical("Could not create client for monitor configuredJob %+v", t.monitor)
return nil, errors.Wrap(err, "could not create client for monitor configuredJob")
}

return t, nil
}, nil
}

// jobConfig represents fields needed to execute a single job.
type jobConfig struct {
Name string `config:"pluginName"`
Type string `config:"type"`
Schedule *schedule.Schedule `config:"schedule" validate:"required"`

// Fields and tags to add to monitor.
EventMetadata common.EventMetadata `config:",inline"`
Processors processors.PluginConfig `config:"processors"`

// KeepNull determines whether published events will keep null values or omit them.
KeepNull bool `config:"keep_null"`
}

// ProcessorsError is used to indicate situations when processors could not be loaded.
Expand All @@ -101,15 +77,7 @@ func (t *configuredJob) makeSchedulerTaskFunc() scheduler.TaskFunc {
func (t *configuredJob) Start() {
var err error

fields := common.MapStr{"event": common.MapStr{"dataset": "uptime"}}
t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{
Processing: beat.ProcessingConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
KeepNull: t.config.KeepNull,
Fields: fields,
},
})
t.client, err = t.monitor.pipelineConnector.Connect()
if err != nil {
logp.Err("could not start monitor: %v", err)
return
Expand Down
Loading

0 comments on commit 61eaf3b

Please sign in to comment.