diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 080efbbaa946..fc48da844939 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -93,6 +93,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - close_timeout is also applied when the output is blocking. {pull}3511[3511] - Improve handling of different path variants on Windows. {pull}3781[3781] - Restructure input.Event to be inline with outputs.Data {pull}3823[3823] +- Add base for supporting prospector level processors {pull}3853[3853] *Heartbeat* diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index a5fd6b90b473..839fa2e79326 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -8,6 +8,7 @@ import ( "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" + "github.com/elastic/beats/libbeat/processors" ) var ( @@ -26,23 +27,24 @@ var ( ) type prospectorConfig struct { - common.EventMetadata `config:",inline"` // Fields and tags to add to events. - Enabled bool `config:"enabled"` - DocumentType string `config:"document_type"` - ExcludeFiles []match.Matcher `config:"exclude_files"` - IgnoreOlder time.Duration `config:"ignore_older"` - Paths []string `config:"paths"` - ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` - InputType string `config:"input_type"` - CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` - CleanRemoved bool `config:"clean_removed"` - HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` - Symlinks bool `config:"symlinks"` - TailFiles bool `config:"tail_files"` - JSON *reader.JSONConfig `config:"json"` - Pipeline string `config:"pipeline"` - Module string `config:"_module_name"` // hidden option to set the module name - Fileset string `config:"_fileset_name"` // hidden option to set the fileset name + common.EventMetadata `config:",inline"` // Fields and tags to add to events. + Enabled bool `config:"enabled"` + DocumentType string `config:"document_type"` + ExcludeFiles []match.Matcher `config:"exclude_files"` + IgnoreOlder time.Duration `config:"ignore_older"` + Paths []string `config:"paths"` + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + InputType string `config:"input_type"` + CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` + CleanRemoved bool `config:"clean_removed"` + HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` + Symlinks bool `config:"symlinks"` + TailFiles bool `config:"tail_files"` + JSON *reader.JSONConfig `config:"json"` + Pipeline string `config:"pipeline"` + Module string `config:"_module_name"` // hidden option to set the module name + Fileset string `config:"_fileset_name"` // hidden option to set the fileset name + Processors processors.PluginConfig `config:"processors"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index b29c41ea382a..f2435bdc51d9 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/beats/libbeat/processors" ) var ( @@ -40,6 +41,7 @@ type Prospector struct { registry *harvesterRegistry beatDone chan struct{} eventCounter *sync.WaitGroup + processors *processors.Processors } // Prospectorer is the interface common to all prospectors @@ -84,6 +86,13 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (* return nil, err } + f, err := processors.New(prospector.config.Processors) + if err != nil { + return nil, err + } + + prospector.processors = f + logp.Debug("prospector", "File Configs: %v", prospector.config.Paths) return prospector, nil @@ -215,6 +224,15 @@ func (p *Prospector) updateState(event *input.Event) error { event.Fileset = p.config.Fileset eventHolder := event.GetData() + //run the filters before sending to spooler + if event.Bytes > 0 { + eventHolder.Event = p.processors.Run(eventHolder.Event) + } + + if eventHolder.Event == nil { + eventHolder.Metadata.Bytes = 0 + } + ok := p.outlet.OnEvent(&eventHolder) if !ok { diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 778678c63add..b3c6d413d3ff 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -28,7 +28,20 @@ filebeat.prospectors: harvester_limit: {{harvester_limit | default(0) }} symlinks: {{symlinks}} pipeline: {{pipeline}} - + {%- if prospector_processors %} + processors: + {%- for processor in prospector_processors %} + {%- for name, settings in processor.iteritems() %} + - {{name}}: + {%- if settings %} + {%- for k, v in settings.iteritems() %} + {{k}}: + {{v | default([])}} + {%- endfor %} + {%- endif %} + {%- endfor %} + {%- endfor %} + {% endif %} {% if fields %} fields: {% for k,v in fields.items() %} diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index f1e34a20347b..c704ae34193a 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -632,3 +632,53 @@ def test_harvester_limit(self): assert len(data) == 3 filebeat.check_kill_and_wait() + + def test_prospector_filter_dropfields(self): + """ + Check drop_fields filtering action at a prospector level + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + prospector_processors=[{ + "drop_fields": { + "fields": ["offset"], + }, + }] + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("test message\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + assert "offset" not in output + assert "message" in output + + def test_prospector_filter_includefields(self): + """ + Check include_fields filtering action at a prospector level + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + prospector_processors=[{ + "include_fields": { + "fields": ["offset"], + }, + }] + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("test message\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + assert "message" not in output + assert "offset" in output diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 1df110c4efb2..e7cde04e9b1b 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -1312,3 +1312,79 @@ def test_ignore_older_state_clean_inactive(self): data = self.get_registry() assert len(data) == 0 + + def test_registrar_files_with_prospector_level_processors(self): + """ + Check that multiple files are put into registrar file with drop event processor + """ + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + prospector_processors=[{ + "drop_event": {}, + }] + ) + os.mkdir(self.working_dir + "/log/") + + testfile_path1 = self.working_dir + "/log/test1.log" + testfile_path2 = self.working_dir + "/log/test2.log" + file1 = open(testfile_path1, 'w') + file2 = open(testfile_path2, 'w') + + iterations = 5 + for _ in range(0, iterations): + file1.write("hello world") # 11 chars + file1.write("\n") # 1 char + file2.write("goodbye world") # 11 chars + file2.write("\n") # 1 char + + file1.close() + file2.close() + + filebeat = self.start_beat() + + # wait until the registry file exist. Needed to avoid a race between + # the logging and actual writing the file. Seems to happen on Windows. + self.wait_until( + lambda: os.path.isfile(os.path.join(self.working_dir, + "registry")), + max_timeout=10) + + # Wait a momemt to make sure registry is completely written + time.sleep(2) + + filebeat.check_kill_and_wait() + + # Check that file exist + data = self.get_registry() + + # Check that 2 files are port of the registrar file + assert len(data) == 2 + + logfile_abs_path = os.path.abspath(testfile_path1) + record = self.get_registry_entry_by_path(logfile_abs_path) + + self.assertDictContainsSubset({ + "source": logfile_abs_path, + "offset": iterations * (len("hello world") + len(os.linesep)), + }, record) + self.assertTrue("FileStateOS" in record) + file_state_os = record["FileStateOS"] + + if os.name == "nt": + # Windows checks + # TODO: Check for IdxHi, IdxLo, Vol in FileStateOS on Windows. + self.assertEqual(len(file_state_os), 3) + elif platform.system() == "SunOS": + stat = os.stat(logfile_abs_path) + self.assertEqual(file_state_os["inode"], stat.st_ino) + + # Python does not return the same st_dev value as Golang or the + # command line stat tool so just check that it's present. + self.assertTrue("device" in file_state_os) + else: + stat = os.stat(logfile_abs_path) + self.assertDictContainsSubset({ + "inode": stat.st_ino, + "device": stat.st_dev, + }, file_state_os)