From c1384ad1efa1d4e8a33c01551ea9e28c540f2087 Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Fri, 20 Jan 2017 13:40:41 +0100 Subject: [PATCH 1/2] Per prospector configurable pipeline This adds a new "pipeline" configuration option to the prospector, which can be used to set the Elasticsearch Ingest Node pipeline from the prospector config. While this was already possible by using format strings in the `pipeline` config from the output, this makes the configuration simpler in many cases and the mechanism is needed for the Filebeat modules. Part of #3159. --- CHANGELOG.asciidoc | 1 + filebeat/_meta/beat.full.yml | 4 ++ .../configuration/filebeat-options.asciidoc | 8 +++ filebeat/filebeat.full.yml | 4 ++ filebeat/harvester/config.go | 1 + filebeat/harvester/log.go | 1 + filebeat/input/event.go | 11 ++++ filebeat/publisher/async.go | 6 +- filebeat/publisher/publisher.go | 9 ++- filebeat/publisher/sync.go | 4 +- filebeat/tests/system/config/filebeat.yml.j2 | 12 +++- filebeat/tests/system/test_modules.py | 62 +++++++++++++++++++ libbeat/publisher/client.go | 2 +- 13 files changed, 116 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 84abfc0d25db..edad6ec64b41 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -93,6 +93,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] *Filebeat* - Add enabled config option to prospectors. {pull}3157[3157] - Add target option for decoded_json_field. {pull}3169[3169] +- Add the `pipeline` config option at the prospector level, for configuring the Ingest Node pipeline ID. {pull}3433[3433] *Winlogbeat* diff --git a/filebeat/_meta/beat.full.yml b/filebeat/_meta/beat.full.yml index 7de2e6522201..fc9c0f19741a 100644 --- a/filebeat/_meta/beat.full.yml +++ b/filebeat/_meta/beat.full.yml @@ -143,6 +143,10 @@ filebeat.prospectors: # this can mean that the first entries of a new file are skipped. #tail_files: false + # The Ingest Node pipeline ID associated with this prospector. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + # Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the # original for harvesting but will report the symlink name as source. #symlinks: false diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 599697728f23..5bb509c49ff2 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -405,6 +405,14 @@ This option applies to files that Filebeat has not already processed. If you ran NOTE: You can use this setting to avoid indexing old log lines when you run Filebeat on a set of log files for the first time. After the first run, we recommend disabling this option, or you risk losing lines during file rotation. +===== pipeline + +The Ingest Node pipeline ID to set for the events generated by this prospector. + +NOTE: The pipeline ID can also be configured in the Elasticsearch output, but this + option usually results in simpler configuration files. If the pipeline is configured both + in the prospector and in the output, the option from the prospector is the one used. + ===== symlinks experimental[] diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 4fec95ecadf1..bd2e0ce5e574 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -143,6 +143,10 @@ filebeat.prospectors: # this can mean that the first entries of a new file are skipped. #tail_files: false + # The Ingest Node pipeline ID associated with this prospector. If this is set, it + # overwrites the pipeline option from the Elasticsearch output. + #pipeline: + # Experimental: If symlinks is enabled, symlinks are opened and harvested. The harvester is openening the # original for harvesting but will report the symlink name as source. #symlinks: false diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index e5fcded5292a..d1baaf9a6385 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -52,6 +52,7 @@ type harvesterConfig struct { MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` Multiline *reader.MultilineConfig `config:"multiline"` JSON *reader.JSONConfig `config:"json"` + Pipeline string `config:"pipeline"` } func (config *harvesterConfig) Validate() error { diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index afbd0a6bd896..a47bf67809c8 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -137,6 +137,7 @@ func (h *Harvester) Harvest(r reader.Reader) { event.InputType = h.config.InputType event.DocumentType = h.config.DocumentType event.JSONConfig = h.config.JSON + event.Pipeline = h.config.Pipeline } // Always send event to update state, also if lines was skipped diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 415de417842c..37a9bc8430f9 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -20,6 +20,7 @@ type Event struct { JSONConfig *reader.JSONConfig State file.State Data common.MapStr // Use in readers to add data to the event + Pipeline string } func NewEvent(state file.State) *Event { @@ -58,6 +59,16 @@ func (e *Event) ToMapStr() common.MapStr { return event } +// Metadata creates a common.MapStr containing the metadata to +// be associated with the event. +func (e *Event) Metadata() common.MapStr { + meta := common.MapStr{} + if len(e.Pipeline) > 0 { + meta["pipeline"] = e.Pipeline + } + return meta +} + // HasData returns true if the event itself contains data // Events without data are only state updates func (e *Event) HasData() bool { diff --git a/filebeat/publisher/async.go b/filebeat/publisher/async.go index 93ed5e5ceb17..38ef9af77baa 100644 --- a/filebeat/publisher/async.go +++ b/filebeat/publisher/async.go @@ -84,10 +84,12 @@ func (p *asyncLogPublisher) Start() { flag: 0, events: events, } + dataEvents, meta := getDataEvents(events) p.client.PublishEvents( - getDataEvents(events), + dataEvents, publisher.Signal(batch), - publisher.Guaranteed) + publisher.Guaranteed, + publisher.MetadataBatch(meta)) p.active.append(batch) case <-ticker.C: diff --git a/filebeat/publisher/publisher.go b/filebeat/publisher/publisher.go index 07bbf5fefb98..914208b9679d 100644 --- a/filebeat/publisher/publisher.go +++ b/filebeat/publisher/publisher.go @@ -45,12 +45,15 @@ var ( ) // getDataEvents returns all events which contain data (not only state updates) -func getDataEvents(events []*input.Event) []common.MapStr { - dataEvents := make([]common.MapStr, 0, len(events)) +// together with their associated metadata +func getDataEvents(events []*input.Event) (dataEvents []common.MapStr, meta []common.MapStr) { + dataEvents = make([]common.MapStr, 0, len(events)) + meta = make([]common.MapStr, 0, len(events)) for _, event := range events { if event.HasData() { dataEvents = append(dataEvents, event.ToMapStr()) + meta = append(meta, event.Metadata()) } } - return dataEvents + return dataEvents, meta } diff --git a/filebeat/publisher/sync.go b/filebeat/publisher/sync.go index 9aba6f925c1b..5b4885959dc8 100644 --- a/filebeat/publisher/sync.go +++ b/filebeat/publisher/sync.go @@ -58,7 +58,9 @@ func (p *syncLogPublisher) Publish() error { case events = <-p.in: } - ok := p.client.PublishEvents(getDataEvents(events), publisher.Sync, publisher.Guaranteed) + dataEvents, meta := getDataEvents(events) + ok := p.client.PublishEvents(dataEvents, publisher.Sync, publisher.Guaranteed, + publisher.MetadataBatch(meta)) if !ok { // PublishEvents will only returns false, if p.client has been closed. return sigPublisherStop diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index c3cc5aa22207..6cbe77641923 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -28,6 +28,7 @@ filebeat.prospectors: clean_removed: {{clean_removed}} harvester_limit: {{harvester_limit | default(0) }} symlinks: {{symlinks}} + pipeline: {{pipeline}} {% if fields %} fields: @@ -132,11 +133,18 @@ processors: # Configure what outputs to use when sending the data collected by the beat. # Multiple outputs may be used. -#------------------------------- File output ---------------------------------- -{%- if logstash %} +{%- if elasticsearch %} +#------------------------------- Elasticsearch output ---------------------------- +output.elasticsearch: + hosts: ["{{ elasticsearch.host }}"] + pipeline: {{elasticsearch.pipeline}} + index: {{elasticsearch.index}} +{%- elif logstash %} +#------------------------------- Logstash output --------------------------------- output.logstash: hosts: ["{{ logstash.host }}"] {%- else %} +#------------------------------- File output ---------------------------------- output.file: path: {{ output_file_path|default(beat.working_dir + "/output") }} filename: "{{ output_file_filename|default("filebeat") }}" diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index 845b6dff02cd..2f439a6418a9 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -110,3 +110,65 @@ def run_on_file(self, module, fileset, test_file, cfgfile): if not found: raise Exception("The following expected object was" + " not found: {}".format(obj)) + + @unittest.skipIf(not INTEGRATION_TESTS or + os.getenv("TESTING_ENVIRONMENT") == "2x", + "integration test not available on 2.x") + def test_prospector_pipeline_config(self): + """ + Tests that the pipeline configured in the prospector overwrites + the one from the output. + """ + self.init() + index_name = "filebeat-test-prospector" + try: + self.es.indices.delete(index=index_name) + except: + pass + self.wait_until(lambda: not self.es.indices.exists(index_name)) + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + elasticsearch=dict( + host=self.elasticsearch_url, + pipeline="estest", + index=index_name), + pipeline="test", + ) + + os.mkdir(self.working_dir + "/log/") + testfile = self.working_dir + "/log/test.log" + with open(testfile, 'a') as file: + file.write("Hello World1\n") + + # put pipeline + self.es.transport.perform_request("PUT", "/_ingest/pipeline/test", + body={ + "processors": [{ + "set": { + "field": "x-pipeline", + "value": "test-pipeline", + } + }]}) + + filebeat = self.start_beat() + + # Wait until the event is in ES + self.wait_until(lambda: self.es.indices.exists(index_name)) + + def search_objects(): + try: + self.es.indices.refresh(index=index_name) + res = self.es.search(index=index_name, + body={"query": {"match_all": {}}}) + return [o["_source"] for o in res["hits"]["hits"]] + except: + return [] + + self.wait_until(lambda: len(search_objects()) > 0, max_timeout=20) + filebeat.check_kill_and_wait() + + objects = search_objects() + assert len(objects) == 1 + o = objects[0] + assert o["x-pipeline"] == "test-pipeline" diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index 246151a96a1b..2001f74b790b 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -238,5 +238,5 @@ func MakeContext(opts []ClientOption) ([]common.MapStr, Context) { } } } - return nil, ctx + return meta, ctx } From 45695cf5ad00b18f14a56eff02d41f513bdb0841 Mon Sep 17 00:00:00 2001 From: Tudor Golubenco Date: Mon, 23 Jan 2017 14:58:33 +0100 Subject: [PATCH 2/2] Use nil if no pipeline is defined --- filebeat/input/event.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 37a9bc8430f9..3d9a88526002 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -62,11 +62,12 @@ func (e *Event) ToMapStr() common.MapStr { // Metadata creates a common.MapStr containing the metadata to // be associated with the event. func (e *Event) Metadata() common.MapStr { - meta := common.MapStr{} - if len(e.Pipeline) > 0 { - meta["pipeline"] = e.Pipeline + if e.Pipeline != "" { + return common.MapStr{ + "pipeline": e.Pipeline, + } } - return meta + return nil } // HasData returns true if the event itself contains data