From 0d147f729680f3205e2afd203239e86fc4b07b39 Mon Sep 17 00:00:00 2001 From: urso Date: Sat, 19 Dec 2015 04:50:34 +0100 Subject: [PATCH] Add support for filebeat multiline handling split processing into 3 layers: - input layer - line processing layer - event publisher layer (for loop driver) Input layer is responsible for reading files and forwarding errors if appropriate. new multiline system tests: - elasticsearch log with java exception - c-style log tests - max_lines test - max_bytes test - timeout test added asciidocs for multiline --- CHANGELOG.asciidoc | 1 + filebeat/config/config.go | 17 +- filebeat/crawler/prospector.go | 4 +- filebeat/docs/configuration.asciidoc | 47 +++- filebeat/etc/beat.yml | 33 ++- filebeat/etc/filebeat.yml | 33 ++- filebeat/harvester/harvester.go | 2 - filebeat/harvester/log.go | 217 +++++---------- filebeat/harvester/log_test.go | 58 ++-- filebeat/harvester/processor/multiline.go | 260 ++++++++++++++++++ .../harvester/processor/multiline_test.go | 126 +++++++++ filebeat/harvester/processor/processor.go | 109 ++++++++ .../harvester/processor/processor_test.go | 40 +++ filebeat/harvester/processor/timeout.go | 70 +++++ filebeat/harvester/reader.go | 166 +++++++++++ filebeat/harvester/util.go | 52 +--- .../logs/elasticsearch-multiline-log.log | 52 ++++ filebeat/tests/files/logs/multiline-c-log.log | 6 + filebeat/tests/system/config/filebeat.yml.j2 | 18 +- filebeat/tests/system/test_multiline.py | 184 +++++++++++++ 20 files changed, 1249 insertions(+), 246 deletions(-) create mode 100644 filebeat/harvester/processor/multiline.go create mode 100644 filebeat/harvester/processor/multiline_test.go create mode 100644 filebeat/harvester/processor/processor.go create mode 100644 filebeat/harvester/processor/processor_test.go create mode 100644 filebeat/harvester/processor/timeout.go create mode 100644 filebeat/harvester/reader.go create mode 100644 filebeat/tests/files/logs/elasticsearch-multiline-log.log create mode 100644 filebeat/tests/files/logs/multiline-c-log.log create mode 100644 filebeat/tests/system/test_multiline.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6b4ef7f0305..d47796f8fea 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] - group all cpu usage per core statistics and export them optionally if cpu_per_core is configured {pull}496[496] *Filebeat* +- Add multiline support for combining multiple related lines into one event. {issue}461[461] *Winlogbeat* diff --git a/filebeat/config/config.go b/filebeat/config/config.go index db45ea305b2..af53d00abb9 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -67,9 +67,19 @@ type HarvesterConfig struct { BackoffFactor int `yaml:"backoff_factor"` MaxBackoff string `yaml:"max_backoff"` MaxBackoffDuration time.Duration - ForceCloseFiles bool `yaml:"force_close_files"` - ExcludeLines []string `yaml:"exclude_lines"` - IncludeLines []string `yaml:"include_lines"` + ForceCloseFiles bool `yaml:"force_close_files"` + ExcludeLines []string `yaml:"exclude_lines"` + IncludeLines []string `yaml:"include_lines"` + MaxBytes *int `yaml:"max_bytes"` + Multiline *MultilineConfig `yaml:"multiline"` +} + +type MultilineConfig struct { + Pattern string `yaml:"pattern"` + Negate bool `yaml:"negate"` + Match string `yaml:"match"` + MaxLines *int `yaml:"max_lines"` + Timeout string `yaml:"timeout"` } const ( @@ -157,5 +167,4 @@ func (config *Config) FetchConfigs() { if len(config.Filebeat.Prospectors) == 0 { log.Fatalf("No paths given. What files do you want me to watch?") } - } diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index dfadc0d3f43..792ae477b8f 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -152,6 +152,8 @@ func (p *Prospector) logRun(spoolChan chan *input.FileEvent) { // Seed last scan time p.lastscan = time.Now() + logp.Debug("prospector", "exclude_files: %s", p.ProspectorConfig.ExcludeFiles) + // Now let's do one quick scan to pick up new files for _, path := range p.ProspectorConfig.Paths { p.scan(path, spoolChan) @@ -242,7 +244,7 @@ func (p *Prospector) isFileExcluded(file string) bool { func (p *Prospector) scan(path string, output chan *input.FileEvent) { logp.Debug("prospector", "scan path %s", path) - logp.Debug("prospector", "exclude_files: %s", p.ProspectorConfig.ExcludeFiles) + // Evaluate the path as a wildcards/shell glob matches, err := filepath.Glob(path) if err != nil { diff --git a/filebeat/docs/configuration.asciidoc b/filebeat/docs/configuration.asciidoc index 521e1debe25..683cd426779 100644 --- a/filebeat/docs/configuration.asciidoc +++ b/filebeat/docs/configuration.asciidoc @@ -93,7 +93,7 @@ If both `include_lines` and `exclude_lines` are defined, then include_lines is c ===== exclude_files -A list of regular expressions to match the files to be ignored. By default no file is excluded. +A list of regular expressions to match the files to be ignored. By default no file is excluded. [source,yaml] ------------------------------------------------------------------------------------- @@ -148,6 +148,51 @@ document. The default value is `log`. The buffer size every harvester uses when fetching the file. The default is 16384. +===== max_bytes + +Maximum number of bytes a single log event can have. All bytes after max_bytes are discarded and not sent. +This is especially useful for multiline log messages which can get large. The default is 10MB (10485760). + +===== multiline + +Mutiline can be used for log messages spanning multiple lines. This is common for Java Stack Traces +or C-Line Continuation. The following example combines all lines following a line start with a `[`. +This could for example be a Java Stack Trace. + +[source,yaml] +------------------------------------------------------------------------------------- +multiline: + pattern: ^\[ + match: after + negate: true +------------------------------------------------------------------------------------- + +====== pattern + +The regexp pattern that has to be matched. The example pattern matches all lines starting with [ + +====== negate + +Defines if the pattern set under pattern should be negated or not. Default is false. + +====== match + +Match must be set to "after" or "before". It is used to define if lines should be +appended to a pattern that was (not) matched before or after or as long as a +pattern is not matched based on negate. + +NOTE: After is the equivalent to previous and before is the equivalent to to next in https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html[Logstash]. + +====== max_lines + +The maximum number of lines that are combined to one event. In case there are more the max_lines the additional +lines are discarded. Default is set to 500. + +====== timeout + +After the defined timeout, an multiline event is sent even if no new pattern was found to start a new event. +Default is set to 5s. + ===== tail_files diff --git a/filebeat/etc/beat.yml b/filebeat/etc/beat.yml index 64821d7da29..17d8bfb269e 100644 --- a/filebeat/etc/beat.yml +++ b/filebeat/etc/beat.yml @@ -32,11 +32,11 @@ filebeat: # Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. The include_lines is called before - # exclude_lines. By default, no lines are dropped. + # exclude_lines. By default, no lines are dropped. # exclude_lines: ["^DBG"] # Include lines. A list of regular expressions to match. It exports the lines that are - # matching any regular expression from the list. The include_lines is called before + # matching any regular expression from the list. The include_lines is called before # exclude_lines. By default, all the lines are exported. # include_lines: ["^ERR", "^WARN"] @@ -73,6 +73,35 @@ filebeat: # Defines the buffer size every harvester uses when fetching the file #harvester_buffer_size: 16384 + # Maximum number of bytes a single log event can have + # All bytes after max_bytes are discarded and not sent. The default is 10MB. + # This is especially useful for multiline log messages which can get large. + #max_bytes: 10485760 + + # Mutiline can be used for log messages spanning multiple lines. This is common + # for Java Stack Traces or C-Line Continuation + #multiline: + + # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ + #pattern: ^\[ + + # Defines if the pattern set under pattern should be negated or not. Default is false. + #negate: false + + # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern + # that was (not) matched before or after or as long as a pattern is not matched based on negate. + # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash + #match: after + + # The maximum number of lines that are combined to one event. + # In case there are more the max_lines the additional lines are discarded. + # Default is 500 + #max_lines: 500 + + # After the defined timeout, an multiline event is sent even if no new pattern was found to start a new event + # Default is 5s. + #timeout: 5s + # Setting tail_files to true means filebeat starts readding new files at the end # instead of the beginning. If this is used in combination with log rotation # this can mean that the first entries of a new file are skipped. diff --git a/filebeat/etc/filebeat.yml b/filebeat/etc/filebeat.yml index 34c3dc5e824..09ac3386629 100644 --- a/filebeat/etc/filebeat.yml +++ b/filebeat/etc/filebeat.yml @@ -32,11 +32,11 @@ filebeat: # Exclude lines. A list of regular expressions to match. It drops the lines that are # matching any regular expression from the list. The include_lines is called before - # exclude_lines. By default, no lines are dropped. + # exclude_lines. By default, no lines are dropped. # exclude_lines: ["^DBG"] # Include lines. A list of regular expressions to match. It exports the lines that are - # matching any regular expression from the list. The include_lines is called before + # matching any regular expression from the list. The include_lines is called before # exclude_lines. By default, all the lines are exported. # include_lines: ["^ERR", "^WARN"] @@ -73,6 +73,35 @@ filebeat: # Defines the buffer size every harvester uses when fetching the file #harvester_buffer_size: 16384 + # Maximum number of bytes a single log event can have + # All bytes after max_bytes are discarded and not sent. The default is 10MB. + # This is especially useful for multiline log messages which can get large. + #max_bytes: 10485760 + + # Mutiline can be used for log messages spanning multiple lines. This is common + # for Java Stack Traces or C-Line Continuation + #multiline: + + # The regexp Pattern that has to be matched. The example pattern matches all lines starting with [ + #pattern: ^\[ + + # Defines if the pattern set under pattern should be negated or not. Default is false. + #negate: false + + # Match can be set to "after" or "before". It is used to define if lines should be append to a pattern + # that was (not) matched before or after or as long as a pattern is not matched based on negate. + # Note: After is the equivalent to previous and before is the equivalent to to next in Logstash + #match: after + + # The maximum number of lines that are combined to one event. + # In case there are more the max_lines the additional lines are discarded. + # Default is 500 + #max_lines: 500 + + # After the defined timeout, an multiline event is sent even if no new pattern was found to start a new event + # Default is 5s. + #timeout: 5s + # Setting tail_files to true means filebeat starts readding new files at the end # instead of the beginning. If this is used in combination with log rotation # this can mean that the first entries of a new file are skipped. diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index d63e348ecbf..842de098128 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -17,7 +17,6 @@ import ( "io" "os" "regexp" - "time" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" @@ -33,7 +32,6 @@ type Harvester struct { SpoolerChan chan *input.FileEvent encoding encoding.EncodingFactory file FileSource /* the file being watched */ - backoff time.Duration ExcludeLinesRegexp []*regexp.Regexp IncludeLinesRegexp []*regexp.Regexp } diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index e7efa22a3e4..608b81670ba 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -5,15 +5,19 @@ import ( "fmt" "io" "os" - "time" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" + "github.com/elastic/beats/filebeat/harvester/processor" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/logp" "golang.org/x/text/transform" ) +const ( + defaultMaxBytes = 10 * (1 << 20) // 10MB +) + func NewHarvester( prospectorCfg config.ProspectorConfig, cfg *config.HarvesterConfig, @@ -34,7 +38,6 @@ func NewHarvester( Stat: stat, SpoolerChan: spooler, encoding: encoding, - backoff: prospectorCfg.Harvester.BackoffDuration, } h.ExcludeLinesRegexp, err = InitRegexps(cfg.ExcludeLines) if err != nil { @@ -47,9 +50,42 @@ func NewHarvester( return h, nil } +func createLineReader( + in FileSource, + codec encoding.Encoding, + bufferSize int, + maxBytes int, + readerConfig logFileReaderConfig, + mlrConfig *config.MultilineConfig, +) (processor.LineProcessor, error) { + var p processor.LineProcessor + var err error + + fileReader, err := newLogFileReader(in, readerConfig) + if err != nil { + return nil, err + } + + p, err = processor.NewLineSource(fileReader, codec, bufferSize) + if err != nil { + return nil, err + } + + if mlrConfig != nil { + p, err = processor.NewMultiline(p, maxBytes, mlrConfig) + if err != nil { + return nil, err + } + + return processor.NewStripNewline(p), nil + } + + p = processor.NewStripNewline(p) + return processor.NewLimitProcessor(p, maxBytes), nil +} + // Log harvester reads files line by line and sends events to the defined output func (h *Harvester) Harvest() { - defer func() { // On completion, push offset so we can continue where we left off if we relaunch on the same file h.Stat.Return <- h.Offset @@ -79,45 +115,53 @@ func (h *Harvester) Harvest() { // TODO: NewLineReader uses additional buffering to deal with encoding and testing // for new lines in input stream. Simple 8-bit based encodings, or plain // don't require 'complicated' logic. - timedIn := newTimedReader(h.file) - reader, err := encoding.NewLineReader(timedIn, enc, h.Config.BufferSize) + config := h.Config + readerConfig := logFileReaderConfig{ + forceClose: config.ForceCloseFiles, + maxInactive: h.ProspectorConfig.IgnoreOlderDuration, + backoffDuration: config.BackoffDuration, + maxBackoffDuration: config.MaxBackoffDuration, + backoffFactor: config.BackoffFactor, + } + + maxBytes := defaultMaxBytes + if config.MaxBytes != nil { + maxBytes = *config.MaxBytes + } + + reader, err := createLineReader( + h.file, enc, config.BufferSize, maxBytes, readerConfig, config.Multiline) if err != nil { logp.Err("Stop Harvesting. Unexpected encoding line reader error: %s", err) return } - // XXX: lastReadTime handling last time a full line was read only? - // timedReader provides timestamp some bytes have actually been read from file - lastReadTime := time.Now() - for { // Partial lines return error and are only read on completion - text, bytesRead, err := readLine(reader, &timedIn.lastReadTime) - + ts, text, bytesRead, err := readLine(reader) if err != nil { - - // In case of err = io.EOF returns nil - err = h.handleReadlineError(lastReadTime, err) - - // Return in case of error which leads to stopping harvester and closing file - if err != nil { - logp.Info("Read line error: %s", err) - return + if err == errFileTruncate { + seeker, ok := h.file.(io.Seeker) + if !ok { + logp.Err("can not seek source") + return + } + + logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) + + h.Offset = 0 + seeker.Seek(h.Offset, os.SEEK_SET) + continue } - continue + logp.Info("Read line error: %s", err) + return } - lastReadTime = time.Now() - - // Reset Backoff - h.backoff = h.Config.BackoffDuration - if h.shouldExportLine(text) { - // Sends text to spooler event := &input.FileEvent{ - ReadTime: lastReadTime, + ReadTime: ts, Source: &h.Path, InputType: h.Config.InputType, DocumentType: h.Config.DocumentType, @@ -159,21 +203,6 @@ func (h *Harvester) shouldExportLine(line string) bool { } -// backOff checks the backoff variable and sleeps for the given time -// It also recalculate and sets the next backoff duration -func (h *Harvester) backOff() { - // Wait before trying to read file which reached EOF again - time.Sleep(h.backoff) - - // Increment backoff up to maxBackoff - if h.backoff < h.Config.MaxBackoffDuration { - h.backoff = h.backoff * time.Duration(h.Config.BackoffFactor) - if h.backoff > h.Config.MaxBackoffDuration { - h.backoff = h.Config.MaxBackoffDuration - } - } -} - // open does open the file given under h.Path and assigns the file handler to h.file func (h *Harvester) open() (encoding.Encoding, error) { // Special handling that "-" means to read from standard input @@ -250,7 +279,6 @@ func (h *Harvester) initFileOffset(file *os.File) error { } else { // get offset from file in case of encoding factory was // required to read some data. - logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset) h.Offset = offset } @@ -258,110 +286,7 @@ func (h *Harvester) initFileOffset(file *os.File) error { return err } -// handleReadlineError handles error which are raised during reading file. -// -// If error is EOF, it will check for: -// * File truncated -// * Older then ignore_older -// * General file error -// -// If none of the above cases match, no error will be returned and file is kept open -// -// In case of a general error, the error itself is returned -func (h *Harvester) handleReadlineError(lastTimeRead time.Time, err error) error { - if err != io.EOF || !h.file.Continuable() { - logp.Err("Unexpected state reading from %s; error: %s", h.Path, err) - return err - } - - // Refetch fileinfo to check if the file was truncated or disappeared. - // Errors if the file was removed/rotated after reading and before - // calling the stat function - info, statErr := h.file.Stat() - if statErr != nil { - logp.Err("Unexpected error reading from %s; error: %s", h.Path, statErr) - return statErr - } - - // Handle fails if file was truncated - if info.Size() < h.Offset { - seeker, ok := h.file.(io.Seeker) - if !ok { - logp.Err("Can not seek source") - return err - } - - logp.Debug("harvester", "File was truncated as offset (%s) > size (%s). Begin reading file from offset 0: %s", h.Offset, info.Size(), h.Path) - - h.Offset = 0 - seeker.Seek(h.Offset, os.SEEK_SET) - return nil - } - - age := time.Since(lastTimeRead) - if age > h.ProspectorConfig.IgnoreOlderDuration { - // If the file hasn't change for longer the ignore_older, harvester stops - // and file handle will be closed. - return fmt.Errorf("Stop harvesting as file is older then ignore_older: %s; Last change was: %s ", h.Path, age) - } - - if h.Config.ForceCloseFiles { - // Check if the file name exists (see #93) - _, statErr := os.Stat(h.file.Name()) - - // Error means file does not exist. If no error, check if same file. If not close as rotated. - if statErr != nil || !input.IsSameFile(h.file.Name(), info) { - logp.Info("Force close file: %s; error: %s", h.Path, statErr) - // Return directly on windows -> file is closing - return fmt.Errorf("Force closing file: %s", h.Path) - } - } - - if err != io.EOF { - logp.Err("Unexpected state reading from %s; error: %s", h.Path, err) - } - - logp.Debug("harvester", "End of file reached: %s; Backoff now.", h.Path) - - // Do nothing in case it is just EOF, keep reading the file after backing off - h.backOff() - return nil -} - func (h *Harvester) Stop() { } const maxConsecutiveEmptyReads = 100 - -// timedReader keeps track of last time bytes have been read from underlying -// reader. -type timedReader struct { - reader io.Reader - lastReadTime time.Time // last time we read some data from input stream -} - -func newTimedReader(reader io.Reader) *timedReader { - r := &timedReader{ - reader: reader, - } - return r -} - -func (r *timedReader) Read(p []byte) (int, error) { - var err error - n := 0 - - for i := maxConsecutiveEmptyReads; i > 0; i-- { - n, err = r.reader.Read(p) - if n > 0 { - r.lastReadTime = time.Now() - break - } - - if err != nil { - break - } - } - - return n, err -} diff --git a/filebeat/harvester/log_test.go b/filebeat/harvester/log_test.go index 8ccf28b269e..29ed46580b0 100644 --- a/filebeat/harvester/log_test.go +++ b/filebeat/harvester/log_test.go @@ -1,12 +1,13 @@ package harvester import ( - "io" + "fmt" "math/rand" "os" "path/filepath" "strconv" "testing" + "time" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/stretchr/testify/assert" @@ -54,62 +55,35 @@ func TestReadLine(t *testing.T) { assert.NotNil(t, h) // Read only 10 bytes which is not the end of the file - timedIn := newTimedReader(readFile) codec, _ := encoding.Plain(file) - reader, _ := encoding.NewLineReader(timedIn, codec, 100) + readConfig := logFileReaderConfig{ + maxInactive: 500 * time.Millisecond, + backoffDuration: 100 * time.Millisecond, + maxBackoffDuration: 1 * time.Second, + backoffFactor: 2, + } + reader, _ := createLineReader(fileSource{readFile}, codec, 100, 1000, readConfig, nil) // Read third line - text, bytesread, err := readLine(reader, &timedIn.lastReadTime) - + _, text, bytesread, err := readLine(reader) + fmt.Printf("received line: '%s'\n", text) assert.Nil(t, err) assert.Equal(t, text, firstLineString[0:len(firstLineString)-1]) assert.Equal(t, bytesread, len(firstLineString)) // read second line - text, bytesread, err = readLine(reader, &timedIn.lastReadTime) - + _, text, bytesread, err = readLine(reader) + fmt.Printf("received line: '%s'\n", text) assert.Equal(t, text, secondLineString[0:len(secondLineString)-1]) assert.Equal(t, bytesread, len(secondLineString)) assert.Nil(t, err) // Read third line, which doesn't exist - text, bytesread, err = readLine(reader, &timedIn.lastReadTime) + _, text, bytesread, err = readLine(reader) + fmt.Printf("received line: '%s'\n", text) assert.Equal(t, "", text) assert.Equal(t, bytesread, 0) - assert.Equal(t, err, io.EOF) -} - -func TestIsLine(t *testing.T) { - notLine := []byte("This is not a line") - assert.False(t, isLine(notLine)) - - notLine = []byte("This is not a line\n\r") - assert.False(t, isLine(notLine)) - - notLine = []byte("This is \n not a line") - assert.False(t, isLine(notLine)) - - line := []byte("This is a line \n") - assert.True(t, isLine(line)) - - line = []byte("This is a line\r\n") - assert.True(t, isLine(line)) -} - -func TestLineEndingChars(t *testing.T) { - - line := []byte("Not ending line") - assert.Equal(t, 0, lineEndingChars(line)) - - line = []byte("N ending \n") - assert.Equal(t, 1, lineEndingChars(line)) - - line = []byte("RN ending \r\n") - assert.Equal(t, 2, lineEndingChars(line)) - - // This is an invalid option - line = []byte("NR ending \n\r") - assert.Equal(t, 0, lineEndingChars(line)) + assert.Equal(t, err, errInactive) } func TestExcludeLine(t *testing.T) { diff --git a/filebeat/harvester/processor/multiline.go b/filebeat/harvester/processor/multiline.go new file mode 100644 index 00000000000..bc76a0573a5 --- /dev/null +++ b/filebeat/harvester/processor/multiline.go @@ -0,0 +1,260 @@ +package processor + +import ( + "errors" + "fmt" + "regexp" + "time" + + "github.com/elastic/beats/filebeat/config" +) + +// MultiLine processor combining multiple line events into one multi-line event. +// +// Lines to be combined are matched by some configurable predicate using +// regular expression. +// +// The maximum number of bytes and lines to be returned is fully configurable. +// Even if limits are reached subsequent lines are matched, until event is +// fully finished. +// +// Errors will force the multiline processor to return the currently active +// multiline event first and finally return the actual error on next call to Next. +type MultiLine struct { + reader LineProcessor + pred matcher + maxBytes int // bytes stored in content + maxLines int + + ts time.Time + content []byte + last []byte + readBytes int // bytes as read from input source + numLines int + + err error // last seen error + state func(*MultiLine) (Line, error) +} + +const ( + // Default maximum number of lines to return in one multi-line event + defaultMaxLines = 500 + + // Default timeout to finish a multi-line event. + defaultMultilineTimeout = 5 * time.Second +) + +// Matcher represents the predicate comparing any two lines +// to find start and end of multiline events in stream of line events. +type matcher func(last, current []byte) bool + +var ( + errMultilineTimeout = errors.New("multline timeout") +) + +// NewMultiline creates a new multi-line processor combining stream of +// line events into stream of multi-line events. +func NewMultiline( + r LineProcessor, + maxBytes int, + config *config.MultilineConfig, +) (*MultiLine, error) { + type matcherFactory func(pattern string) (matcher, error) + types := map[string]matcherFactory{ + "before": beforeMatcher, + "after": afterMatcher, + } + + matcherType, ok := types[config.Match] + if !ok { + return nil, fmt.Errorf("unknown matcher type: %s", config.Match) + } + + matcher, err := matcherType(config.Pattern) + if err != nil { + return nil, err + } + + if config.Negate { + matcher = negatedMatcher(matcher) + } + + maxLines := defaultMaxLines + if config.MaxLines != nil { + maxLines = *config.MaxLines + } + + timeout := defaultMultilineTimeout + if config.Timeout != "" { + timeout, err = time.ParseDuration(config.Timeout) + if err != nil { + return nil, fmt.Errorf("failed to parse duration '%s': %v", config.Timeout, err) + } + if timeout < 0 { + return nil, fmt.Errorf("timeout %v must not be negative", config.Timeout) + } + } + + if timeout > 0 { + r = newTimeoutProcessor(r, errMultilineTimeout, timeout) + } + + mlr := &MultiLine{ + reader: r, + pred: matcher, + state: (*MultiLine).readNext, + maxBytes: maxBytes, + maxLines: maxLines, + } + return mlr, nil +} + +// Next returns next multi-line event. +func (mlr *MultiLine) Next() (Line, error) { + return mlr.state(mlr) +} + +func (mlr *MultiLine) readNext() (Line, error) { + for { + l, err := mlr.reader.Next() + if err != nil { + // handle multiline timeout signal + if err == errMultilineTimeout { + // no lines buffered -> ignore timeout + if mlr.numLines == 0 { + continue + } + + // return collected multiline event and + // empty buffer for new multiline event + l := mlr.pushLine() + return l, nil + } + + // handle error without any bytes returned from reader + if l.Bytes == 0 { + // no lines buffered -> return error + if mlr.numLines == 0 { + return Line{}, err + } + + // lines buffered, return multiline and error on next read + l := mlr.pushLine() + mlr.err = err + mlr.state = (*MultiLine).readFailed + return l, nil + } + + // handle error with some content being returned by reader and + // line matching multiline criteria or no multiline started yet + if mlr.readBytes == 0 || mlr.pred(mlr.last, l.Content) { + mlr.addLine(l) + + // return multiline and error on next read + l := mlr.pushLine() + mlr.err = err + mlr.state = (*MultiLine).readFailed + return l, nil + } + + // no match, return current multline and retry with current line on next + // call to readNext awaiting the error being reproduced (or resolved) + // in next call to Next + l := mlr.startNewLine(l) + return l, nil + } + + // if predicate does not match current multiline -> return multiline event + if mlr.readBytes > 0 && !mlr.pred(mlr.last, l.Content) { + l := mlr.startNewLine(l) + return l, nil + } + + // add line to current multiline event + mlr.addLine(l) + } +} + +func (mlr *MultiLine) readFailed() (Line, error) { + // return error and reset line reader + err := mlr.err + mlr.err = nil + mlr.state = (*MultiLine).readNext + return Line{}, err +} + +func (mlr *MultiLine) startNewLine(l Line) Line { + retLine := mlr.pushLine() + mlr.addLine(l) + mlr.ts = l.Ts + return retLine +} + +func (mlr *MultiLine) pushLine() Line { + content := mlr.content + sz := mlr.readBytes + + mlr.content = nil + mlr.last = nil + mlr.readBytes = 0 + mlr.numLines = 0 + mlr.err = nil + + return Line{Ts: mlr.ts, Content: content, Bytes: sz} +} + +func (mlr *MultiLine) addLine(l Line) { + if l.Bytes <= 0 { + return + } + + space := mlr.maxBytes - len(mlr.content) + spaceLeft := (mlr.maxBytes <= 0 || space > 0) && + (mlr.maxLines <= 0 || mlr.numLines < mlr.maxLines) + if spaceLeft { + if space < 0 || space > len(l.Content) { + space = len(l.Content) + } + mlr.content = append(mlr.content, l.Content[:space]...) + mlr.numLines++ + } + + mlr.last = l.Content + mlr.readBytes += l.Bytes +} + +// matchers + +func afterMatcher(pattern string) (matcher, error) { + return genPatternMatcher(pattern, func(last, current []byte) []byte { + return current + }) +} + +func beforeMatcher(pattern string) (matcher, error) { + return genPatternMatcher(pattern, func(last, current []byte) []byte { + return last + }) +} + +func negatedMatcher(m matcher) matcher { + return func(last, current []byte) bool { + return !m(last, current) + } +} + +func genPatternMatcher( + pattern string, + sel func(last, current []byte) []byte, +) (matcher, error) { + reg, err := regexp.CompilePOSIX(pattern) + if err != nil { + return nil, err + } + + matcher := func(last, current []byte) bool { + line := sel(last, current) + return reg.Match(line) + } + return matcher, nil +} diff --git a/filebeat/harvester/processor/multiline_test.go b/filebeat/harvester/processor/multiline_test.go new file mode 100644 index 00000000000..f5d575f0661 --- /dev/null +++ b/filebeat/harvester/processor/multiline_test.go @@ -0,0 +1,126 @@ +package processor + +import ( + "bytes" + "errors" + "os" + "testing" + + "github.com/elastic/beats/filebeat/config" + "github.com/elastic/beats/filebeat/harvester/encoding" + "github.com/stretchr/testify/assert" +) + +type bufferSource struct{ buf *bytes.Buffer } + +func (p bufferSource) Read(b []byte) (int, error) { return p.buf.Read(b) } +func (p bufferSource) Close() error { return nil } +func (p bufferSource) Name() string { return "buffer" } +func (p bufferSource) Stat() (os.FileInfo, error) { return nil, errors.New("unknown") } +func (p bufferSource) Continuable() bool { return false } + +func TestMultilineAfterOK(t *testing.T) { + testMultilineOK(t, + config.MultilineConfig{ + Pattern: "^[ \t] +", // next line is indented by spaces + Match: "after", + }, + "line1\n line1.1\n line1.2\n", + "line2\n line2.1\n line2.2\n", + ) +} + +func TestMultilineBeforeOK(t *testing.T) { + testMultilineOK(t, + config.MultilineConfig{ + Pattern: "\\\\$", // previous line ends with \ + Match: "before", + }, + "line1 \\\nline1.1 \\\nline1.2\n", + "line2 \\\nline2.1 \\\nline2.2\n", + ) +} + +func TestMultilineAfterNegateOK(t *testing.T) { + testMultilineOK(t, + config.MultilineConfig{ + Pattern: "^-", // first line starts with '-' at beginning of line + Negate: true, + Match: "after", + }, + "-line1\n - line1.1\n - line1.2\n", + "-line2\n - line2.1\n - line2.2\n", + ) +} + +func TestMultilineBeforeNegateOK(t *testing.T) { + testMultilineOK(t, + config.MultilineConfig{ + Pattern: ";$", // last line ends with ';' + Negate: true, + Match: "before", + }, + "line1\nline1.1\nline1.2;\n", + "line2\nline2.1\nline2.2;\n", + ) +} + +func testMultilineOK(t *testing.T, cfg config.MultilineConfig, expected ...string) { + _, buf := createLineBuffer(expected...) + reader := createMultilineTestReader(t, buf, cfg) + + var lines []string + var sizes []int + for { + line, err := reader.Next() + if err != nil { + break + } + + lines = append(lines, string(line.Content)) + sizes = append(sizes, line.Bytes) + } + + if len(lines) != len(expected) { + t.Fatalf("expected %v lines, read only %v line(s)", len(expected), len(lines)) + } + + for i, line := range lines { + expected := expected[i] + assert.Equal(t, line, expected) + assert.Equal(t, sizes[i], len(expected)) + } +} + +func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg config.MultilineConfig) LineProcessor { + encFactory, ok := encoding.FindEncoding("plain") + if !ok { + t.Fatalf("unable to find 'plain' encoding") + } + + enc, err := encFactory(in) + if err != nil { + t.Fatalf("failed to initialize encoding: %v", err) + } + + var reader LineProcessor + reader, err = NewLineSource(in, enc, 4096) + if err != nil { + t.Fatalf("Failed to initialize line reader: %v", err) + } + + reader, err = NewMultiline(reader, 1<<20, &cfg) + if err != nil { + t.Fatalf("failed to initializ reader: %v", err) + } + + return reader +} + +func createLineBuffer(lines ...string) ([]string, *bytes.Buffer) { + buf := bytes.NewBuffer(nil) + for _, line := range lines { + buf.WriteString(line) + } + return lines, buf +} diff --git a/filebeat/harvester/processor/processor.go b/filebeat/harvester/processor/processor.go new file mode 100644 index 00000000000..4e6b7daf6c0 --- /dev/null +++ b/filebeat/harvester/processor/processor.go @@ -0,0 +1,109 @@ +package processor + +import ( + "io" + "time" + + "github.com/elastic/beats/filebeat/harvester/encoding" +) + +// Line represents a line event with timestamp, content and actual number +// of bytes read from input before decoding. +type Line struct { + Ts time.Time // timestamp the line was read + Content []byte // actual line read + Bytes int // total number of bytes read to generate the line +} + +// LineProcessor is the interface that wraps the basic Next method for +// getting a new line. +// Next returns the line being read or and error. EOF is returned +// if processor will not return any new lines on subsequent calls. +type LineProcessor interface { + Next() (Line, error) +} + +// LineSource produces lines by reading lines from an io.Reader +// through a decoder converting the reader it's encoding to utf-8. +type LineSource struct { + reader *encoding.LineReader +} + +// StripNewline processor removes the last trailing newline characters from +// read lines. +type StripNewline struct { + reader LineProcessor +} + +// LimitProcessor sets an upper limited on line length. Lines longer +// then the max configured line length will be snapped short. +type LimitProcessor struct { + reader LineProcessor + maxBytes int +} + +// NewLineSource creates a new LineSource from input reader by applying +// the given codec. +func NewLineSource( + in io.Reader, + codec encoding.Encoding, + bufferSize int, +) (LineSource, error) { + r, err := encoding.NewLineReader(in, codec, bufferSize) + return LineSource{r}, err +} + +// Next reads the next line from it's initial io.Reader +func (p LineSource) Next() (Line, error) { + c, sz, err := p.reader.Next() + return Line{Ts: time.Now(), Content: c, Bytes: sz}, err +} + +// NewStripNewline creates a new line reader stripping the last tailing newline. +func NewStripNewline(r LineProcessor) *StripNewline { + return &StripNewline{r} +} + +// Next returns the next line. +func (p *StripNewline) Next() (Line, error) { + line, err := p.reader.Next() + if err != nil { + return line, err + } + + L := line.Content + line.Content = L[:len(L)-lineEndingChars(L)] + return line, err +} + +// NewLimitProcessor creates a new processor limiting the line length. +func NewLimitProcessor(in LineProcessor, maxBytes int) *LimitProcessor { + return &LimitProcessor{reader: in, maxBytes: maxBytes} +} + +// Next returns the next line. +func (p *LimitProcessor) Next() (Line, error) { + line, err := p.reader.Next() + if len(line.Content) > p.maxBytes { + line.Content = line.Content[:p.maxBytes] + } + return line, err +} + +// isLine checks if the given byte array is a line, means has a line ending \n +func isLine(l []byte) bool { + return l != nil && len(l) > 0 && l[len(l)-1] == '\n' +} + +// lineEndingChars returns the number of line ending chars the given by array has +// In case of Unix/Linux files, it is -1, in case of Windows mostly -2 +func lineEndingChars(l []byte) int { + if !isLine(l) { + return 0 + } + + if len(l) > 1 && l[len(l)-2] == '\r' { + return 2 + } + return 1 +} diff --git a/filebeat/harvester/processor/processor_test.go b/filebeat/harvester/processor/processor_test.go new file mode 100644 index 00000000000..b6b6d661fb2 --- /dev/null +++ b/filebeat/harvester/processor/processor_test.go @@ -0,0 +1,40 @@ +package processor + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsLine(t *testing.T) { + notLine := []byte("This is not a line") + assert.False(t, isLine(notLine)) + + notLine = []byte("This is not a line\n\r") + assert.False(t, isLine(notLine)) + + notLine = []byte("This is \n not a line") + assert.False(t, isLine(notLine)) + + line := []byte("This is a line \n") + assert.True(t, isLine(line)) + + line = []byte("This is a line\r\n") + assert.True(t, isLine(line)) +} + +func TestLineEndingChars(t *testing.T) { + + line := []byte("Not ending line") + assert.Equal(t, 0, lineEndingChars(line)) + + line = []byte("N ending \n") + assert.Equal(t, 1, lineEndingChars(line)) + + line = []byte("RN ending \r\n") + assert.Equal(t, 2, lineEndingChars(line)) + + // This is an invalid option + line = []byte("NR ending \n\r") + assert.Equal(t, 0, lineEndingChars(line)) +} diff --git a/filebeat/harvester/processor/timeout.go b/filebeat/harvester/processor/timeout.go new file mode 100644 index 00000000000..33776a2e173 --- /dev/null +++ b/filebeat/harvester/processor/timeout.go @@ -0,0 +1,70 @@ +package processor + +import ( + "errors" + "time" +) + +var ( + errTimeout = errors.New("timeout") +) + +// timeoutProcessor will signal some configurable timeout error if no +// new line can be returned in time. +type timeoutProcessor struct { + reader LineProcessor + timeout time.Duration + signal error + + running bool + ch chan lineMessage +} + +type lineMessage struct { + line Line + err error +} + +// newTimeoutProcessor returns a new timeoutProcessor from an input line processor. +func newTimeoutProcessor(in LineProcessor, signal error, timeout time.Duration) *timeoutProcessor { + if signal == nil { + signal = errTimeout + } + + return &timeoutProcessor{ + reader: in, + signal: signal, + timeout: timeout, + ch: make(chan lineMessage, 1), + } +} + +// Next returns the next line. If no line was returned before timeout, the +// configured timeout error is returned. +// For handline timeouts a goroutine is started for reading lines from +// configured line processor. Only when underlying processor returns an error, the +// goroutine will be finished. +func (p *timeoutProcessor) Next() (Line, error) { + if !p.running { + p.running = true + go func() { + for { + line, err := p.reader.Next() + p.ch <- lineMessage{line, err} + if err != nil { + break + } + } + }() + } + + select { + case msg := <-p.ch: + if msg.err != nil { + p.running = false + } + return msg.line, msg.err + case <-time.After(p.timeout): + return Line{}, p.signal + } +} diff --git a/filebeat/harvester/reader.go b/filebeat/harvester/reader.go new file mode 100644 index 00000000000..a1b9ae0f273 --- /dev/null +++ b/filebeat/harvester/reader.go @@ -0,0 +1,166 @@ +package harvester + +import ( + "errors" + "fmt" + "io" + "os" + "time" + + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/libbeat/logp" +) + +type logFileReader struct { + fs FileSource + offset int64 + config logFileReaderConfig + truncated bool + + lastTimeRead time.Time + backoff time.Duration +} + +type logFileReaderConfig struct { + forceClose bool + maxInactive time.Duration + backoffDuration time.Duration + maxBackoffDuration time.Duration + backoffFactor int +} + +var ( + errFileTruncate = errors.New("detected file being truncated") + errForceClose = errors.New("file must be closed") + errInactive = errors.New("file inactive") +) + +func newLogFileReader( + fs FileSource, + config logFileReaderConfig, +) (*logFileReader, error) { + var offset int64 + if seeker, ok := fs.(io.Seeker); ok { + var err error + offset, err = seeker.Seek(0, os.SEEK_CUR) + if err != nil { + return nil, err + } + } + + return &logFileReader{ + fs: fs, + offset: offset, + config: config, + lastTimeRead: time.Now(), + backoff: config.backoffDuration, + }, nil +} + +func (r *logFileReader) Read(buf []byte) (int, error) { + fmt.Println("call Read") + + if r.truncated { + var offset int64 + if seeker, ok := r.fs.(io.Seeker); ok { + var err error + offset, err = seeker.Seek(0, os.SEEK_CUR) + if err != nil { + return 0, err + } + } + r.offset = offset + r.truncated = false + } + + for { + n, err := r.fs.Read(buf) + if n > 0 { + fmt.Printf("did read(%v): '%s'\n", n, buf[:n]) + + r.offset += int64(n) + r.lastTimeRead = time.Now() + } + if err == nil { + // reset backoff + r.backoff = r.config.backoffDuration + fmt.Printf("return size: %v\n", n) + return n, nil + } + + continuable := r.fs.Continuable() + fmt.Printf("error: %v, continuable: %v\n", err, continuable) + + if err == io.EOF && !continuable { + logp.Info("Reached end of file: %s", r.fs.Name()) + return n, err + } + + if err != io.EOF || !continuable { + logp.Err("Unexpected state reading from %s; error: %s", r.fs.Name(), err) + return n, err + } + + // Refetch fileinfo to check if the file was truncated or disappeared. + // Errors if the file was removed/rotated after reading and before + // calling the stat function + info, statErr := r.fs.Stat() + if statErr != nil { + logp.Err("Unexpected error reading from %s; error: %s", r.fs.Name(), statErr) + return n, statErr + } + + // handle fails if file was truncated + if info.Size() < r.offset { + logp.Debug("harvester", + "File was truncated as offset (%s) > size (%s). Begin reading file from offset 0: %s", + r.offset, info.Size(), r.fs.Name()) + r.truncated = true + return n, errFileTruncate + } + + age := time.Since(r.lastTimeRead) + if age > r.config.maxInactive { + // If the file hasn't change for longer then maxInactive, harvester stops + // and file handle will be closed. + return n, errInactive + } + + if r.config.forceClose { + // Check if the file name exists (see #93) + _, statErr := os.Stat(r.fs.Name()) + + // Error means file does not exist. If no error, check if same file. If + // not close as rotated. + if statErr != nil || !input.IsSameFile(r.fs.Name(), info) { + logp.Info("Force close file: %s; error: %s", r.fs.Name(), statErr) + // Return directly on windows -> file is closing + return n, errForceClose + } + } + + if err != io.EOF { + logp.Err("Unexpected state reading from %s; error: %s", r.fs.Name(), err) + } + + logp.Debug("harvester", "End of file reached: %s; Backoff now.", r.fs.Name()) + buf = buf[n:] + if len(buf) == 0 { + return n, nil + } + r.wait() + } +} + +func (r *logFileReader) wait() { + // Wait before trying to read file wr.ch reached EOF again + time.Sleep(r.backoff) + + // Increment backoff up to maxBackoff + if r.backoff < r.config.maxBackoffDuration { + r.backoff = r.backoff * time.Duration(r.config.backoffFactor) + if r.backoff > r.config.maxBackoffDuration { + r.backoff = r.config.maxBackoffDuration + } + } +} diff --git a/filebeat/harvester/util.go b/filebeat/harvester/util.go index 2bc9d137f48..d57a9873fd2 100644 --- a/filebeat/harvester/util.go +++ b/filebeat/harvester/util.go @@ -4,63 +4,25 @@ import ( "regexp" "time" - "github.com/elastic/beats/filebeat/harvester/encoding" + "github.com/elastic/beats/filebeat/harvester/processor" "github.com/elastic/beats/libbeat/logp" ) -// isLine checks if the given byte array is a line, means has a line ending \n -func isLine(line []byte) bool { - if line == nil || len(line) == 0 { - return false - } - - if line[len(line)-1] != '\n' { - return false - } - return true -} - -// lineEndingChars returns the number of line ending chars the given by array has -// In case of Unix/Linux files, it is -1, in case of Windows mostly -2 -func lineEndingChars(line []byte) int { - if !isLine(line) { - return 0 - } - - if line[len(line)-1] == '\n' { - if len(line) > 1 && line[len(line)-2] == '\r' { - return 2 - } - - return 1 - } - return 0 -} - // readLine reads a full line into buffer and returns it. // In case of partial lines, readLine does return and error and en empty string // This could potentialy be improved / replaced by https://github.com/elastic/beats/libbeat/tree/master/common/streambuf -func readLine( - reader *encoding.LineReader, - lastReadTime *time.Time, -) (string, int, error) { +func readLine(reader processor.LineProcessor) (time.Time, string, int, error) { for { - line, size, err := reader.Next() + l, err := reader.Next() // Full line read to be returned - if size != 0 && err == nil { + if l.Bytes != 0 && err == nil { logp.Debug("harvester", "full line read") - return readlineString(line, size) - } else { - return "", 0, err + return l.Ts, string(l.Content), l.Bytes, err } - } -} -// readlineString removes line ending characters from given by array. -func readlineString(bytes []byte, size int) (string, int, error) { - s := string(bytes)[:len(bytes)-lineEndingChars(bytes)] - return s, size, nil + return time.Time{}, "", 0, err + } } // InitRegexps initializes a list of compiled regular expressions. diff --git a/filebeat/tests/files/logs/elasticsearch-multiline-log.log b/filebeat/tests/files/logs/elasticsearch-multiline-log.log new file mode 100644 index 00000000000..c47a22e642b --- /dev/null +++ b/filebeat/tests/files/logs/elasticsearch-multiline-log.log @@ -0,0 +1,52 @@ +[2015-12-06 01:44:16,735][INFO ][node ] [Zach] version[2.0.0], pid[48553], build[de54438/2015-10-22T08:09:48Z] +[2015-12-06 01:44:16,736][INFO ][node ] [Zach] initializing ... +[2015-12-06 01:44:16,804][INFO ][plugins ] [Zach] loaded [], sites [] +[2015-12-06 01:44:16,941][INFO ][env ] [Zach] using [1] data paths, mounts [[/ (/dev/disk1)]], net usable_space [66.3gb], net total_space [232.6gb], spins? [unknown], types [hfs] +[2015-12-06 01:44:19,177][INFO ][node ] [Zach] initialized +[2015-12-06 01:44:19,177][INFO ][node ] [Zach] starting ... +[2015-12-06 01:44:19,356][INFO ][transport ] [Zach] publish_address {127.0.0.1:9300}, bound_addresses {127.0.0.1:9300}, {[fe80::1]:9300}, {[::1]:9300} +[2015-12-06 01:44:19,367][INFO ][discovery ] [Zach] elasticsearch/qfPw9z0HQe6grbJQruTCJQ +[2015-12-06 01:44:22,405][INFO ][cluster.service ] [Zach] new_master {Zach}{qfPw9z0HQe6grbJQruTCJQ}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-join(elected_as_master, [0] joins received) +[2015-12-06 01:44:22,432][INFO ][http ] [Zach] publish_address {127.0.0.1:9200}, bound_addresses {127.0.0.1:9200}, {[fe80::1]:9200}, {[::1]:9200} +[2015-12-06 01:44:22,432][INFO ][node ] [Zach] started +[2015-12-06 01:44:22,446][INFO ][gateway ] [Zach] recovered [0] indices into cluster_state +[2015-12-06 01:44:52,882][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] creating index, cause [auto(bulk api)], templates [], shards [5]/[1], mappings [process, system] +[2015-12-06 01:44:53,256][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [process] +[2015-12-06 01:44:53,269][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process] +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,274][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][0] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vcVA0hoJdODMTz], source[{"@timestamp":"2015-12-06T00:44:52.448Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":1902,"user_p":0,"system":941,"total":2843,"start_time":"Dec03"},"mem":{"size":3616309248,"rss":156405760,"rss_p":0.01,"share":0},"name":"Google Chrome H","pid":40572,"ppid":392,"state":"running"},"type":"process"}]} +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,279][DEBUG][action.admin.indices.mapping.put] [Zach] failed to put mappings on indices [[filebeat-2015.12.06]], type [process] +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,280][DEBUG][action.bulk ] [Zach] [filebeat-2015.12.06][1] failed to execute bulk item (index) index {[filebeat-2015.12.06][process][AVF0v5vbVA0hoJdODMTj], source[{"@timestamp":"2015-12-06T00:44:52.416Z","beat":{"hostname":"ruflin","name":"ruflin"},"count":1,"proc":{"cpu":{"user":6643,"user_p":0.01,"system":693,"total":7336,"start_time":"01:44"},"mem":{"size":5182656512,"rss":248872960,"rss_p":0.01,"share":0},"name":"java","pid":48553,"ppid":48547,"state":"running"},"type":"process"}]} +MergeMappingException[Merge failed with failures {[mapper [proc.mem.rss_p] of different type, current_type [long], merged_type [double], mapper [proc.cpu.user_p] of different type, current_type [long], merged_type [double]]}] + at org.elasticsearch.cluster.metadata.MetaDataMappingService$2.execute(MetaDataMappingService.java:388) + at org.elasticsearch.cluster.service.InternalClusterService$UpdateTask.run(InternalClusterService.java:388) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:225) + at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:188) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) + at java.lang.Thread.run(Thread.java:745) +[2015-12-06 01:44:53,334][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] update_mapping [system] +[2015-12-06 01:44:53,646][INFO ][cluster.metadata ] [Zach] [filebeat-2015.12.06] create_mapping [filesystem] diff --git a/filebeat/tests/files/logs/multiline-c-log.log b/filebeat/tests/files/logs/multiline-c-log.log new file mode 100644 index 00000000000..a3684d35177 --- /dev/null +++ b/filebeat/tests/files/logs/multiline-c-log.log @@ -0,0 +1,6 @@ +The following are log messages +This is a C style log\\ +file which is on multiple\\ +lines +In addition it has normal lines +The total should be 4 lines covered diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 69ea994aa65..28e03525cbe 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -16,23 +16,39 @@ filebeat: backoff_factor: 1 max_backoff: 0.1s force_close_files: {{force_close_files}} + {% if fields %} fields: {% for k,v in fields.items() %} {{k}}: {{v}} {% endfor %} {% endif %} + fields_under_root: {{"true" if fieldsUnderRoot else "false"}} + {% if include_lines %} include_lines: {{include_lines}} {% endif %} + {% if exclude_lines %} exclude_lines: {{exclude_lines}} {% endif %} + {% if exclude_files %} exclude_files: {{exclude_files}} {% endif %} - + + max_bytes: {{ max_bytes|default(10485760) }} + + {% if multiline %} + multiline: + pattern: {{pattern}} + negate: {{negate}} + match: {{match}} + timeout: 1s + max_lines: {{ max_lines|default(500) }} + {% endif %} + spool_size: idle_timeout: 0.1s registry_file: {{ fb.working_dir + '/' }}{{ registryFile|default(".filebeat")}} diff --git a/filebeat/tests/system/test_multiline.py b/filebeat/tests/system/test_multiline.py new file mode 100644 index 00000000000..85204fbe2e8 --- /dev/null +++ b/filebeat/tests/system/test_multiline.py @@ -0,0 +1,184 @@ +from filebeat import TestCase +import os +import socket +import shutil + +""" +Tests for the multiline log messages +""" + + +class Test(TestCase): + def test_java_elasticsearch_log(self): + """ + Test that multi lines for java logs works. + It checks that all lines which do not start with [ are append to the last line starting with [ + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + multiline=True, + pattern="^\[", + negate="true", + match="after" + ) + + os.mkdir(self.working_dir + "/log/") + shutil.copy2("../files/logs/elasticsearch-multiline-log.log", os.path.abspath(self.working_dir) + "/log/elasticsearch-multiline-log.log") + + proc = self.start_filebeat() + + # wait for the "Skipping file" log message + self.wait_until( + lambda: self.output_has(lines=20), + max_timeout=10) + + proc.kill_and_wait() + + output = self.read_output() + + # Check that output file has the same number of lines as the log file + assert 20 == len(output) + + def test_c_style_log(self): + """ + Test that multi lines for c style log works + It checks that all lines following a line with \\ are appended to the previous line + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + multiline=True, + pattern="\\\\$", + match="after" + ) + + os.mkdir(self.working_dir + "/log/") + shutil.copy2("../files/logs/multiline-c-log.log", os.path.abspath(self.working_dir) + "/log/multiline-c-log.log") + + proc = self.start_filebeat() + + # wait for the "Skipping file" log message + self.wait_until( + lambda: self.output_has(lines=4), + max_timeout=10) + + proc.kill_and_wait() + + output = self.read_output() + + # Check that output file has the same number of lines as the log file + assert 4 == len(output) + + def test_max_lines(self): + """ + Test the maximum number of lines that is sent by multiline + All further lines are discarded + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + multiline=True, + pattern="^\[", + negate="true", + match="after", + max_lines=3 + ) + + os.mkdir(self.working_dir + "/log/") + shutil.copy2("../files/logs/elasticsearch-multiline-log.log", os.path.abspath(self.working_dir) + "/log/elasticsearch-multiline-log.log") + + proc = self.start_filebeat() + + self.wait_until( + lambda: self.output_has(lines=20), + max_timeout=10) + + proc.kill_and_wait() + + output = self.read_output() + + # Checks line 3 is sent + assert True == self.log_contains("MetaDataMappingService.java:388", "output/filebeat") + + # Checks line 4 is not sent anymore + assert False == self.log_contains("InternalClusterService.java:388", "output/filebeat") + + # Check that output file has the same number of lines as the log file + assert 20 == len(output) + + def test_timeout(self): + """ + Test that data is sent after timeout + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + multiline=True, + pattern="^\[", + negate="true", + match="after", + ) + + os.mkdir(self.working_dir + "/log/") + + testfile = self.working_dir + "/log/test.log" + file = open(testfile, 'w', 0) + + file.write("[2015] hello world") + file.write("\n") + file.write(" First Line\n") + file.write(" Second Line\n") + + proc = self.start_filebeat() + + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=10) + + # Because of the timeout the following two lines should be put together + file.write(" This should not be third\n") + file.write(" This should not be fourth\n") + # This starts a new pattern + file.write("[2016] Hello world\n") + # This line should be appended + file.write(" First line again\n") + + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + proc.kill_and_wait() + + output = self.read_output() + assert 3 == len(output) + + def test_max_bytes(self): + """ + Test the maximum number of bytes that is sent + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + multiline=True, + pattern="^\[", + negate="true", + match="after", + max_bytes=60 + ) + + os.mkdir(self.working_dir + "/log/") + shutil.copy2("../files/logs/elasticsearch-multiline-log.log", os.path.abspath(self.working_dir) + "/log/elasticsearch-multiline-log.log") + + proc = self.start_filebeat() + + self.wait_until( + lambda: self.output_has(lines=20), + max_timeout=10) + + proc.kill_and_wait() + + output = self.read_output() + + # Check that first 60 chars are sent + assert True == self.log_contains("cluster.metadata", "output/filebeat") + + # Checks that chars aferwards are not sent + assert False == self.log_contains("Zach", "output/filebeat") + + # Check that output file has the same number of lines as the log file + assert 20 == len(output)