Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup Filebeat code #1975

Merged
merged 1 commit into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,11 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

/*
The hierarchy for the crawler objects is explained as following

Crawler: Filebeat has one crawler. The crawler is the single point of control
and stores the state. The state is written through the registrar
Prospector: For every FileConfig the crawler starts a prospector
Harvester: For every file found inside the FileConfig, the Prospector starts a Harvester
The harvester send their events to the spooler
The spooler sends the event to the publisher
The publisher writes the state down with the registrar
*/

type Crawler struct {
prospectors []*prospector.Prospector
wg sync.WaitGroup
spooler *spooler.Spooler
prospectorConfigs []*common.Config
spooler *spooler.Spooler
wg sync.WaitGroup
}

func New(spooler *spooler.Spooler, prospectorConfigs []*common.Config) (*Crawler, error) {
Expand Down Expand Up @@ -58,7 +46,6 @@ func (c *Crawler) Start(states file.States) error {

logp.Info("Loading Prospectors completed. Number of prospectors: %v", len(c.prospectors))

c.wg = sync.WaitGroup{}
for i, p := range c.prospectors {
c.wg.Add(1)

Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
CloseOlder: 1 * time.Hour,
MaxBytes: 10 * (1 << 20), // 10MB
MaxBytes: 10 * humanize.MiByte,
CloseRemoved: false,
CloseRenamed: false,
CloseEOF: false,
Expand Down
18 changes: 7 additions & 11 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ type Harvester struct {
offset int64
state file.State
prospectorChan chan *input.FileEvent
encoding encoding.EncodingFactory
file source.FileSource /* the file being watched */
ExcludeLinesRegexp []*regexp.Regexp
IncludeLinesRegexp []*regexp.Regexp
done chan struct{}
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
}

func NewHarvester(
Expand All @@ -59,30 +60,25 @@ func NewHarvester(
if err := cfg.Unpack(&h.config); err != nil {
return nil, err
}
if err := h.config.Validate(); err != nil {
return nil, err
}

encoding, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encoding == nil {
encodingFactory, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encodingFactory == nil {
return nil, fmt.Errorf("unknown encoding('%v')", h.config.Encoding)
}
h.encoding = encoding
h.encodingFactory = encodingFactory

h.ExcludeLinesRegexp = h.config.ExcludeLines
h.IncludeLinesRegexp = h.config.IncludeLines
return h, nil
}

// open does open the file given under h.Path and assigns the file handler to h.file
func (h *Harvester) open() (encoding.Encoding, error) {
func (h *Harvester) open() error {

switch h.config.InputType {
case config.StdinInputType:
return h.openStdin()
case config.LogInputType:
return h.openFile()
default:
return nil, fmt.Errorf("Invalid input type")
return fmt.Errorf("Invalid input type")
}
}
97 changes: 43 additions & 54 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/processor"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/harvester/source"
Expand All @@ -26,31 +25,15 @@ func (h *Harvester) Harvest() {

h.state.Finished = false

enc, err := h.open()
err := h.open()
if err != nil {
logp.Err("Stop Harvesting. Unexpected file opening error: %s", err)
return
}

logp.Info("Harvester started for file: %s", h.path)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
cfg := h.config
readerConfig := reader.LogFileReaderConfig{
CloseRemoved: cfg.CloseRemoved,
CloseRenamed: cfg.CloseRenamed,
CloseOlder: cfg.CloseOlder,
CloseEOF: cfg.CloseEOF,
BackoffDuration: cfg.Backoff,
MaxBackoffDuration: cfg.MaxBackoff,
BackoffFactor: cfg.BackoffFactor,
}

processor, err := createLineProcessor(
h.file, enc, cfg.BufferSize, cfg.MaxBytes, readerConfig,
cfg.JSON, cfg.Multiline, h.done)
processor, err := h.newLineProcessor()
if err != nil {
logp.Err("Stop Harvesting. Unexpected encoding line reader error: %s", err)
return
Expand All @@ -62,7 +45,6 @@ func (h *Harvester) Harvest() {
}

for {

select {
case <-h.done:
return
Expand Down Expand Up @@ -101,6 +83,7 @@ func (h *Harvester) Harvest() {
}

// Always send event to update state, also if lines was skipped
// Stop harvester in case of an error
if !h.sendEvent(event) {
return
}
Expand Down Expand Up @@ -140,15 +123,15 @@ func (h *Harvester) sendEvent(event *input.FileEvent) bool {
// shouldExportLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (h *Harvester) shouldExportLine(line string) bool {
if len(h.IncludeLinesRegexp) > 0 {
if !MatchAnyRegexps(h.IncludeLinesRegexp, line) {
if len(h.config.IncludeLines) > 0 {
if !MatchAnyRegexps(h.config.IncludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line)
return false
}
}
if len(h.ExcludeLinesRegexp) > 0 {
if MatchAnyRegexps(h.ExcludeLinesRegexp, line) {
if len(h.config.ExcludeLines) > 0 {
if MatchAnyRegexps(h.config.ExcludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line)
return false
Expand All @@ -163,66 +146,63 @@ func (h *Harvester) shouldExportLine(line string) bool {
// or the file cannot be opened because for example of failing read permissions, an error
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (h *Harvester) openFile() (encoding.Encoding, error) {
var encoding encoding.Encoding
func (h *Harvester) openFile() error {

f, err := file.ReadOpen(h.path)
if err != nil {
logp.Err("Failed opening %s: %s", h.path, err)
return nil, err
return err
}

// Check we are not following a rabbit hole (symlinks, etc.)
if !file.IsRegular(f) {
return nil, errors.New("Given file is not a regular file.")
return errors.New("Given file is not a regular file.")
}

info, err := f.Stat()
if err != nil {
logp.Err("Failed getting stats for file %s: %s", h.path, err)
return nil, err
return err
}
// Compares the stat of the opened file to the state given by the prospector. Abort if not match.
if !os.SameFile(h.state.Fileinfo, info) {
return nil, errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.")
return errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.")
}

encoding, err = h.encoding(f)
h.encoding, err = h.encodingFactory(f)
if err != nil {

if err == transform.ErrShortSrc {
logp.Info("Initialising encoding for '%v' failed due to file being too short", f)
} else {
logp.Err("Initialising encoding for '%v' failed: %v", f, err)
}
return nil, err
return err
}

// update file offset
err = h.initFileOffset(f)
if err != nil {
return nil, err
return err
}

// yay, open file
h.file = source.File{f}
return encoding, nil
return nil
}

func (h *Harvester) initFileOffset(file *os.File) error {
offset, err := file.Seek(0, os.SEEK_CUR)

if h.getOffset() > 0 {
// continue from last known offset

logp.Debug("harvester",
"harvest: %q position:%d (offset snapshot:%d)", h.path, h.getOffset(), offset)
_, err = file.Seek(h.getOffset(), os.SEEK_SET)
} else if h.config.TailFiles {
// tail file if file is new and tail_files config is set
logp.Debug("harvester", "harvest: (tailing) %q (offset snapshot:%d)", h.path, offset)

logp.Debug("harvester",
"harvest: (tailing) %q (offset snapshot:%d)", h.path, offset)
offset, err = file.Seek(0, os.SEEK_END)
h.SetOffset(offset)

Expand Down Expand Up @@ -290,40 +270,49 @@ func (h *Harvester) close() {
}
}

func createLineProcessor(
in source.FileSource,
codec encoding.Encoding,
bufferSize int,
maxBytes int,
readerConfig reader.LogFileReaderConfig,
jsonConfig *processor.JSONConfig,
mlrConfig *processor.MultilineConfig,
done chan struct{},
) (processor.LineProcessor, error) {
func (h *Harvester) newLogFileReaderConfig() reader.LogFileReaderConfig {
// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
return reader.LogFileReaderConfig{
CloseRemoved: h.config.CloseRemoved,
CloseRenamed: h.config.CloseRenamed,
CloseOlder: h.config.CloseOlder,
CloseEOF: h.config.CloseEOF,
Backoff: h.config.Backoff,
MaxBackoff: h.config.MaxBackoff,
BackoffFactor: h.config.BackoffFactor,
}
}

func (h *Harvester) newLineProcessor() (processor.LineProcessor, error) {

readerConfig := h.newLogFileReaderConfig()

var p processor.LineProcessor
var err error

fileReader, err := reader.NewLogFileReader(in, readerConfig, done)
fileReader, err := reader.NewLogFileReader(h.file, readerConfig, h.done)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having h.file available, we can initialize the encoding before this line. When initializing the encoding, the file pointer still should be at offset 0 though (in case of encoding uses some BOM). But why not set file offset in newLineProcessor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be that we continue reading so the offset is not at 0. So that means if we init the encoding too late, we have a problem here?

About setting the offset: I don't think that is a processors tasks, but the lines are still blury THB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment about encoding handling here: #1913

if err != nil {
return nil, err
}

p, err = processor.NewLineEncoder(fileReader, codec, bufferSize)
p, err = processor.NewLineEncoder(fileReader, h.encoding, h.config.BufferSize)
if err != nil {
return nil, err
}

if jsonConfig != nil {
p = processor.NewJSONProcessor(p, jsonConfig)
if h.config.JSON != nil {
p = processor.NewJSONProcessor(p, h.config.JSON)
}

p = processor.NewStripNewline(p)
if mlrConfig != nil {
p, err = processor.NewMultiline(p, "\n", maxBytes, mlrConfig)
if h.config.Multiline != nil {
p, err = processor.NewMultiline(p, "\n", h.config.MaxBytes, h.config.Multiline)
if err != nil {
return nil, err
}
}

return processor.NewLimitProcessor(p, maxBytes), nil
return processor.NewLimitProcessor(p, h.config.MaxBytes), nil
}
32 changes: 22 additions & 10 deletions filebeat/harvester/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,30 @@ func TestReadLine(t *testing.T) {
defer readFile.Close()
assert.Nil(t, err)

h := Harvester{}
f := source.File{readFile}

h := Harvester{
config: harvesterConfig{
CloseOlder: 500 * time.Millisecond,
Backoff: 100 * time.Millisecond,
MaxBackoff: 1 * time.Second,
BackoffFactor: 2,
BufferSize: 100,
MaxBytes: 1000,
},
file: f,
}
assert.NotNil(t, h)

// Read only 10 bytes which is not the end of the file
codec, _ := encoding.Plain(file)
readConfig := reader.LogFileReaderConfig{
CloseOlder: 500 * time.Millisecond,
BackoffDuration: 100 * time.Millisecond,
MaxBackoffDuration: 1 * time.Second,
BackoffFactor: 2,
}
r, _ := createLineProcessor(source.File{readFile}, codec, 100, 1000, readConfig, nil, nil, nil)
var ok bool
h.encodingFactory, ok = encoding.FindEncoding(h.config.Encoding)
assert.True(t, ok)

h.encoding, err = h.encodingFactory(readFile)
assert.NoError(t, err)

r, err := h.newLineProcessor()
assert.NoError(t, err)

// Read third line
_, text, bytesread, _, err := readLine(r)
Expand Down
Loading