Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prospector: cache resolved glob patterns during init #4269

Merged
merged 1 commit into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d

*Filebeat*
- Fix race condition on harvester stopping with reloading enabled. {issue}3779[3779]
- Fix recursive glob config parsing and resolution across restarts. {pull}4269[4269]

*Heartbeat*

Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/file/glob.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func wildcards(doubleStarPatternDepth uint8, dir string, suffix string) []string
return wildcardList
}

// globPattern detects the use of "**" and expands it to standard glob patterns up to a max depth
func globPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error) {
// GlobPatterns detects the use of "**" and expands it to standard glob patterns up to a max depth
func GlobPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error) {
if doubleStarPatternDepth == 0 {
return []string{pattern}, nil
}
Expand Down Expand Up @@ -54,7 +54,7 @@ func globPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error

// Glob expands '**' patterns into multiple patterns to satisfy https://golang.org/pkg/path/filepath/#Match
func Glob(pattern string, doubleStarPatternDepth uint8) ([]string, error) {
patterns, err := globPatterns(pattern, doubleStarPatternDepth)
patterns, err := GlobPatterns(pattern, doubleStarPatternDepth)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/file/glob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type globPatternsTest struct {

func TestGlobPatterns(t *testing.T) {
for _, test := range globPatternsTests {
patterns, err := globPatterns(test.pattern, 2)
patterns, err := GlobPatterns(test.pattern, 2)
if err != nil {
if test.expectedError {
continue
Expand Down
26 changes: 25 additions & 1 deletion filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/logp"
)

var (
Expand Down Expand Up @@ -59,7 +61,7 @@ type config struct {
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
recursiveGlob bool `config:"recursive_glob.enabled"`
RecursiveGlob bool `config:"recursive_glob.enabled"`

// Harvester
BufferSize int `config:"harvester_buffer_size"`
Expand Down Expand Up @@ -117,3 +119,25 @@ func (c *config) Validate() error {

return nil
}

func (c *config) resolvePaths() error {
var paths []string
if !c.RecursiveGlob {
logp.Debug("prospector", "recursive glob disabled")
paths = c.Paths
} else {
logp.Debug("prospector", "recursive glob enabled")
}
for _, path := range c.Paths {
patterns, err := file.GlobPatterns(path, recursiveGlobDepth)
if err != nil {
return err
}
if len(patterns) > 1 {
logp.Debug("prospector", "%q expanded to %#v", path, patterns)
}
paths = append(paths, patterns...)
}
c.Paths = paths
return nil
}
12 changes: 6 additions & 6 deletions filebeat/prospector/log/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Prospector struct {
done chan struct{}
}

// NewLog instantiates a new Log
// NewProspector instantiates a new Log
func NewProspector(cfg *common.Config, states []file.State, outlet channel.Outleter, done chan struct{}) (*Prospector, error) {

p := &Prospector{
Expand All @@ -51,6 +51,10 @@ func NewProspector(cfg *common.Config, states []file.State, outlet channel.Outle
if err := cfg.Unpack(&p.config); err != nil {
return nil, err
}
if err := p.config.resolvePaths(); err != nil {
logp.Err("Failed to resolve paths in config: %+v", err)
return nil, err
}

// Create empty harvester to check if configs are fine
// TODO: Do config validation instead
Expand Down Expand Up @@ -175,11 +179,7 @@ func (p *Prospector) getFiles() map[string]os.FileInfo {
paths := map[string]os.FileInfo{}

for _, path := range p.config.Paths {
depth := uint8(0)
if p.config.recursiveGlob {
depth = recursiveGlobDepth
}
matches, err := file.Glob(path, depth)
matches, err := filepath.Glob(path)
if err != nil {
logp.Err("glob(%s) failed: %v", path, err)
continue
Expand Down
2 changes: 2 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ filebeat.prospectors:
# Paths that should be crawled and fetched
{% if path %}paths:
- {{ path }}{% endif %}
{% if recursive_glob %}recursive_glob.enabled: true
{% endif %}
# Type of the files. Annotated in every documented
scan_frequency: {{scan_frequency | default("0.1s") }}
ignore_older: {{ignore_older}}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def test_symlink_rotated(self):

# Check if two different files are in registry
data = self.get_registry()
assert len(data) == 2
assert len(data) == 2, "expected to see 2 entries, got '%s'" % data

def test_symlink_removed(self):
"""
Expand Down
46 changes: 46 additions & 0 deletions filebeat/tests/system/test_prospector.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env python

from filebeat import BaseTest
import os
import time
Expand Down Expand Up @@ -678,3 +680,47 @@ def test_prospector_filter_includefields(self):
)[0]
assert "message" not in output
assert "offset" in output

def test_restart_recursive_glob(self):
"""
Check that file reading via recursive glob patterns continues after restart
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/**",
scan_frequency="1s",
recursive_glob=True,
)

testfile_dir = os.path.join(self.working_dir, "log", "some", "other", "subdir")
os.makedirs(testfile_dir)
testfile_path = os.path.join(testfile_dir, "input")

filebeat = self.start_beat()

with open(testfile_path, 'w') as testfile:
testfile.write("entry1\n")

self.wait_until(
lambda: self.output_has_message("entry1"),
max_timeout=10,
name="output contains 'entry1'")

filebeat.check_kill_and_wait()

# Append to file
with open(testfile_path, 'a') as testfile:
testfile.write("entry2\n")

filebeat = self.start_beat(output="filebeat2.log")

self.wait_until(
lambda: self.output_has_message("entry2"),
max_timeout=10,
name="output contains 'entry2'")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filebeat must be stopped again here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

filebeat.check_kill_and_wait()


if __name__ == '__main__':
import unittest
unittest.main()
9 changes: 8 additions & 1 deletion filebeat/tests/system/test_registrar.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env python
"""Test the registrar"""

import os
import platform
import time
import shutil

from filebeat import BaseTest
from nose.plugins.skip import SkipTest

Expand Down Expand Up @@ -323,7 +325,7 @@ def test_rotating_file_inode(self):

def test_restart_continue(self):
"""
Check that file readining continues after restart
Check that file reading continues after restart
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
Expand Down Expand Up @@ -1396,3 +1398,8 @@ def test_registrar_files_with_prospector_level_processors(self):
"inode": stat.st_ino,
"device": stat.st_dev,
}, file_state_os)


if __name__ == '__main__':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this addition, can you directly "run" the file without prepending it with nosetests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, just need to pip install a couple of things

import unittest
unittest.main()
19 changes: 16 additions & 3 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False)


class TimeoutError(Exception):
pass


class Proc(object):
"""
Slim wrapper on subprocess.Popen that redirects
Expand Down Expand Up @@ -279,9 +283,8 @@ def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
start = datetime.now()
while not cond():
if datetime.now() - start > timedelta(seconds=max_timeout):
raise Exception("Timeout waiting for '{}' to be true. "
.format(name) +
"Waited {} seconds.".format(max_timeout))
raise TimeoutError("Timeout waiting for '{}' to be true. ".format(name) +
"Waited {} seconds.".format(max_timeout))
time.sleep(poll_interval)

def get_log(self, logfile=None):
Expand Down Expand Up @@ -351,6 +354,16 @@ def output_has(self, lines, output_file=None):
except IOError:
return False

def output_has_message(self, message, output_file=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will become handy in other tests too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried working with the existing APIs but checking the length wasn't enough since when I restart I want to check for the different message to show up (length still 1)

"""
Returns true if the output has the given message field.
"""
try:
return any(line for line in self.read_output(output_file=output_file, required_fields=["message"])
if line.get("message") == message)
except (IOError, TypeError):
return False

def all_have_fields(self, objs, fields):
"""
Checks that the given list of output objects have
Expand Down