From de395d1a669cc91aa5e9896feb4dc14f80985662 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 6 Oct 2020 08:14:06 +0200 Subject: [PATCH] Add filestream input reader (#21481) (#21538) This PR adds the event reader and publisher functionality. This is mostly the refactoring of `Harvester` from `filebeat/input/log`. Two things are missing: metrics and special readers e.g. `multiline`. (cherry picked from commit 40b24dd2d00bb2a83b46449e452f4b7f86d59087) --- filebeat/input/filestream/config.go | 4 +- filebeat/input/filestream/filestream.go | 75 +++++- filebeat/input/filestream/input.go | 307 +++++++++++++++++++++++- 3 files changed, 365 insertions(+), 21 deletions(-) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 93b232325945..3ec076196f0b 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -48,12 +48,12 @@ type closerConfig struct { type readerCloserConfig struct { AfterInterval time.Duration - Inactive time.Duration OnEOF bool } type stateChangeCloserConfig struct { CheckInterval time.Duration + Inactive time.Duration Removed bool Renamed bool } @@ -94,11 +94,11 @@ func defaultCloserConfig() closerConfig { OnStateChange: stateChangeCloserConfig{ CheckInterval: 5 * time.Second, Removed: true, // TODO check clean_removed option + Inactive: 0 * time.Second, Renamed: false, }, Reader: readerCloserConfig{ OnEOF: false, - Inactive: 0 * time.Second, AfterInterval: 0 * time.Second, }, } diff --git a/filebeat/input/filestream/filestream.go b/filebeat/input/filestream/filestream.go index 59f26ccca1b3..4d42bbf62423 100644 --- a/filebeat/input/filestream/filestream.go +++ b/filebeat/input/filestream/filestream.go @@ -26,6 +26,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/common/backoff" + "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/unison" @@ -43,10 +44,14 @@ type logFile struct { ctx context.Context cancelReading context.CancelFunc - closeInactive time.Duration closeAfterInterval time.Duration closeOnEOF bool + checkInterval time.Duration + closeInactive time.Duration + closeRemoved bool + closeRenamed bool + offset int64 lastTimeRead time.Time backoff backoff.Backoff @@ -59,7 +64,7 @@ func newFileReader( canceler input.Canceler, f *os.File, config readerConfig, - closerConfig readerCloserConfig, + closerConfig closerConfig, ) (*logFile, error) { offset, err := f.Seek(0, os.SEEK_CUR) if err != nil { @@ -69,9 +74,12 @@ func newFileReader( l := &logFile{ file: f, log: log, - closeInactive: closerConfig.Inactive, - closeAfterInterval: closerConfig.AfterInterval, - closeOnEOF: closerConfig.OnEOF, + closeAfterInterval: closerConfig.Reader.AfterInterval, + closeOnEOF: closerConfig.Reader.OnEOF, + checkInterval: closerConfig.OnStateChange.CheckInterval, + closeInactive: closerConfig.OnStateChange.Inactive, + closeRemoved: closerConfig.OnStateChange.Removed, + closeRenamed: closerConfig.OnStateChange.Renamed, offset: offset, lastTimeRead: time.Now(), backoff: backoff.NewExpBackoff(canceler.Done(), config.Backoff.Init, config.Backoff.Max), @@ -143,7 +151,7 @@ func (f *logFile) startFileMonitoringIfNeeded() { if f.closeAfterInterval > 0 { f.tg.Go(func(ctx unison.Canceler) error { - f.closeIfInactive(ctx) + f.periodicStateCheck(ctx) return nil }) } @@ -164,10 +172,8 @@ func (f *logFile) closeIfTimeout(ctx unison.Canceler) { } } -func (f *logFile) closeIfInactive(ctx unison.Canceler) { - // This can be made configureble if users need a more flexible - // cheking for inactive files. - ticker := time.NewTicker(5 * time.Minute) +func (f *logFile) periodicStateCheck(ctx unison.Canceler) { + ticker := time.NewTicker(f.checkInterval) defer ticker.Stop() for { @@ -175,8 +181,7 @@ func (f *logFile) closeIfInactive(ctx unison.Canceler) { case <-ctx.Done(): return case <-ticker.C: - age := time.Since(f.lastTimeRead) - if age > f.closeInactive { + if f.shouldBeClosed() { f.cancelReading() return } @@ -184,6 +189,52 @@ func (f *logFile) closeIfInactive(ctx unison.Canceler) { } } +func (f *logFile) shouldBeClosed() bool { + if f.closeInactive > 0 { + if time.Since(f.lastTimeRead) > f.closeInactive { + return true + } + } + + if !f.closeRemoved && !f.closeRenamed { + return false + + } + + info, statErr := f.file.Stat() + if statErr != nil { + f.log.Errorf("Unexpected error reading from %s; error: %s", f.file.Name(), statErr) + return true + } + + if f.closeRenamed { + // Check if the file can still be found under the same path + if !isSameFile(f.file.Name(), info) { + f.log.Debugf("close_renamed is enabled and file %s has been renamed", f.file.Name()) + return true + } + } + + if f.closeRemoved { + // Check if the file name exists. See https://github.com/elastic/filebeat/issues/93 + if file.IsRemoved(f.file) { + f.log.Debugf("close_removed is enabled and file %s has been removed", f.file.Name()) + return true + } + } + + return false +} + +func isSameFile(path string, info os.FileInfo) bool { + fileInfo, err := os.Stat(path) + if err != nil { + return false + } + + return os.SameFile(fileInfo, info) +} + // errorChecks determines the cause for EOF errors, and how the EOF event should be handled // based on the config options. func (f *logFile) errorChecks(err error) error { diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index bcd143c1c5a4..b6c6598c50bb 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -18,16 +18,27 @@ package filestream import ( + "fmt" + "os" + + "golang.org/x/text/transform" + + "github.com/elastic/go-concert/ctxtool" + loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" input "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/reader" + "github.com/elastic/beats/v7/libbeat/reader/debug" + "github.com/elastic/beats/v7/libbeat/reader/readfile" + "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" ) -// filestream is the input for reading from files which -// are actively written by other applications. -type filestream struct{} +const pluginName = "filestream" type state struct { Source string `json:"source" struct:"source"` @@ -35,7 +46,20 @@ type state struct { IdentifierName string `json:"identifier_name" struct:"identifier_name"` } -const pluginName = "filestream" +// filestream is the input for reading from files which +// are actively written by other applications. +type filestream struct { + readerConfig readerConfig + bufferSize int + tailFile bool // TODO + encodingFactory encoding.EncodingFactory + encoding encoding.Encoding + lineTerminator readfile.LineTerminator + excludeLines []match.Matcher + includeLines []match.Matcher + maxBytes int + closerConfig closerConfig +} // Plugin creates a new filestream input plugin for creating a stateful input. func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin { @@ -55,13 +79,46 @@ func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin { } func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) { - panic("TODO: implement me") + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, nil, err + } + + prospector, err := newFileProspector( + config.Paths, + config.IgnoreOlder, + config.FileWatcher, + config.FileIdentity, + ) + if err != nil { + return nil, nil, err + } + + encodingFactory, ok := encoding.FindEncoding(config.Reader.Encoding) + if !ok || encodingFactory == nil { + return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding) + } + + return prospector, &filestream{ + readerConfig: config.Reader, + bufferSize: config.Reader.BufferSize, + encodingFactory: encodingFactory, + lineTerminator: config.Reader.LineTerminator, + excludeLines: config.Reader.ExcludeLines, + includeLines: config.Reader.IncludeLines, + maxBytes: config.Reader.MaxBytes, + closerConfig: config.Close, + }, nil } func (inp *filestream) Name() string { return pluginName } func (inp *filestream) Test(src loginp.Source, ctx input.TestContext) error { - panic("TODO: implement me") + reader, err := inp.open(ctx.Logger, ctx.Cancelation, state{}) + if err != nil { + return err + } + return reader.Close() } func (inp *filestream) Run( @@ -70,5 +127,241 @@ func (inp *filestream) Run( cursor loginp.Cursor, publisher loginp.Publisher, ) error { - panic("TODO: implement me") + fs, ok := src.(fileSource) + if !ok { + return fmt.Errorf("not file source") + } + + log := ctx.Logger.With("path", fs.newPath).With("state-id", src.Name()) + state := initState(log, cursor, fs) + + r, err := inp.open(log, ctx.Cancelation, state) + if err != nil { + log.Errorf("File could not be opened for reading: %v", err) + return err + } + + _, streamCancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { + log.Debug("Closing reader of filestream") + err := r.Close() + if err != nil { + log.Errorf("Error stopping filestream reader %v", err) + } + }) + defer streamCancel() + + return inp.readFromSource(ctx, log, r, fs.newPath, state, publisher) +} + +func initState(log *logp.Logger, c loginp.Cursor, s fileSource) state { + state := state{Source: s.newPath, IdentifierName: s.identifierGenerator} + if c.IsNew() { + return state + } + + err := c.Unpack(&state) + if err != nil { + log.Error("Cannot serialize cursor data into file state: %+v", err) + } + + return state +} + +func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, s state) (reader.Reader, error) { + f, err := inp.openFile(s.Source, s.Offset) + if err != nil { + return nil, err + } + + log.Debug("newLogFileReader with config.MaxBytes:", inp.maxBytes) + + // TODO: NewLineReader uses additional buffering to deal with encoding and testing + // for new lines in input stream. Simple 8-bit based encodings, or plain + // don't require 'complicated' logic. + logReader, err := newFileReader(log, canceler, f, inp.readerConfig, inp.closerConfig) + if err != nil { + return nil, err + } + + dbgReader, err := debug.AppendReaders(logReader) + if err != nil { + f.Close() + return nil, err + } + + // Configure MaxBytes limit for EncodeReader as multiplied by 4 + // for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters. + // This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file. + // The further size limiting is performed by LimitReader at the end of the readers pipeline as needed. + encReaderMaxBytes := inp.maxBytes * 4 + + var r reader.Reader + r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{ + Codec: inp.encoding, + BufferSize: inp.bufferSize, + Terminator: inp.lineTerminator, + MaxBytes: encReaderMaxBytes, + }) + if err != nil { + f.Close() + return nil, err + } + + r = readfile.NewStripNewline(r, inp.lineTerminator) + r = readfile.NewLimitReader(r, inp.maxBytes) + + return r, nil +} + +// openFile opens a file and checks for the encoding. In case the encoding cannot be detected +// or the file cannot be opened because for example of failing read permissions, an error +// is returned and the harvester is closed. The file will be picked up again the next time +// the file system is scanned +func (inp *filestream) openFile(path string, offset int64) (*os.File, error) { + err := inp.checkFileBeforeOpening(path) + if err != nil { + return nil, err + } + + f, err := os.OpenFile(path, os.O_RDONLY, os.FileMode(0)) + if err != nil { + return nil, fmt.Errorf("failed opening %s: %s", path, err) + } + + err = inp.initFileOffset(f, offset) + if err != nil { + f.Close() + return nil, err + } + + inp.encoding, err = inp.encodingFactory(f) + if err != nil { + f.Close() + if err == transform.ErrShortSrc { + return nil, fmt.Errorf("initialising encoding for '%v' failed due to file being too short", f) + } + return nil, fmt.Errorf("initialising encoding for '%v' failed: %v", f, err) + } + + return f, nil +} + +func (inp *filestream) checkFileBeforeOpening(path string) error { + fi, err := os.Stat(path) + if err != nil { + return fmt.Errorf("failed to stat source file %s: %v", path, err) + } + + if !fi.Mode().IsRegular() { + return fmt.Errorf("tried to open non regular file: %q %s", fi.Mode(), fi.Name()) + } + + if fi.Mode()&os.ModeNamedPipe != 0 { + return fmt.Errorf("failed to open file %s, named pipes are not supported", path) + } + + return nil +} + +func (inp *filestream) initFileOffset(file *os.File, offset int64) error { + if offset > 0 { + _, err := file.Seek(offset, os.SEEK_SET) + return err + } + + // get offset from file in case of encoding factory was required to read some data. + _, err := file.Seek(0, os.SEEK_CUR) + return err +} + +func (inp *filestream) readFromSource( + ctx input.Context, + log *logp.Logger, + r reader.Reader, + path string, + s state, + p loginp.Publisher, +) error { + for ctx.Cancelation.Err() == nil { + message, err := r.Next() + if err != nil { + switch err { + case ErrFileTruncate: + log.Info("File was truncated. Begin reading file from offset 0.") + s.Offset = 0 + case ErrClosed: + log.Info("Reader was closed. Closing.") + case reader.ErrLineUnparsable: + log.Info("Skipping unparsable line in file.") + continue + default: + log.Errorf("Read line error: %v", err) + } + return nil + } + + if message.IsEmpty() || inp.isDroppedLine(log, string(message.Content)) { + continue + } + + event := inp.eventFromMessage(message, path) + s.Offset += int64(message.Bytes) + + if err := p.Publish(event, s); err != nil { + return err + } + } + return nil +} + +// isDroppedLine decides if the line is exported or not based on +// the include_lines and exclude_lines options. +func (inp *filestream) isDroppedLine(log *logp.Logger, line string) bool { + if len(inp.includeLines) > 0 { + if !matchAny(inp.includeLines, line) { + log.Debug("Drop line as it does not match any of the include patterns %s", line) + return true + } + } + if len(inp.excludeLines) > 0 { + if matchAny(inp.excludeLines, line) { + log.Debug("Drop line as it does match one of the exclude patterns%s", line) + return true + } + } + + return false +} + +func matchAny(matchers []match.Matcher, text string) bool { + for _, m := range matchers { + if m.MatchString(text) { + return true + } + } + return false +} + +func (inp *filestream) eventFromMessage(m reader.Message, path string) beat.Event { + fields := common.MapStr{ + "log": common.MapStr{ + "offset": m.Bytes, // Offset here is the offset before the starting char. + "file": common.MapStr{ + "path": path, + }, + }, + } + fields.DeepUpdate(m.Fields) + + if len(m.Content) > 0 { + if fields == nil { + fields = common.MapStr{} + } + fields["message"] = string(m.Content) + } + + return beat.Event{ + Timestamp: m.Ts, + Fields: fields, + } }