Skip to content

Commit

Permalink
Add filestream input reader (elastic#21481)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds the event reader and publisher functionality. This is mostly the refactoring of `Harvester` from `filebeat/input/log`. Two things are missing: metrics and special readers e.g. `multiline`.
  • Loading branch information
kvch authored Oct 5, 2020
1 parent 527ce19 commit 40b24dd
Show file tree
Hide file tree
Showing 3 changed files with 365 additions and 21 deletions.
4 changes: 2 additions & 2 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ type closerConfig struct {

type readerCloserConfig struct {
AfterInterval time.Duration
Inactive time.Duration
OnEOF bool
}

type stateChangeCloserConfig struct {
CheckInterval time.Duration
Inactive time.Duration
Removed bool
Renamed bool
}
Expand Down Expand Up @@ -94,11 +94,11 @@ func defaultCloserConfig() closerConfig {
OnStateChange: stateChangeCloserConfig{
CheckInterval: 5 * time.Second,
Removed: true, // TODO check clean_removed option
Inactive: 0 * time.Second,
Renamed: false,
},
Reader: readerCloserConfig{
OnEOF: false,
Inactive: 0 * time.Second,
AfterInterval: 0 * time.Second,
},
}
Expand Down
75 changes: 63 additions & 12 deletions filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/unison"
Expand All @@ -43,10 +44,14 @@ type logFile struct {
ctx context.Context
cancelReading context.CancelFunc

closeInactive time.Duration
closeAfterInterval time.Duration
closeOnEOF bool

checkInterval time.Duration
closeInactive time.Duration
closeRemoved bool
closeRenamed bool

offset int64
lastTimeRead time.Time
backoff backoff.Backoff
Expand All @@ -59,7 +64,7 @@ func newFileReader(
canceler input.Canceler,
f *os.File,
config readerConfig,
closerConfig readerCloserConfig,
closerConfig closerConfig,
) (*logFile, error) {
offset, err := f.Seek(0, os.SEEK_CUR)
if err != nil {
Expand All @@ -69,9 +74,12 @@ func newFileReader(
l := &logFile{
file: f,
log: log,
closeInactive: closerConfig.Inactive,
closeAfterInterval: closerConfig.AfterInterval,
closeOnEOF: closerConfig.OnEOF,
closeAfterInterval: closerConfig.Reader.AfterInterval,
closeOnEOF: closerConfig.Reader.OnEOF,
checkInterval: closerConfig.OnStateChange.CheckInterval,
closeInactive: closerConfig.OnStateChange.Inactive,
closeRemoved: closerConfig.OnStateChange.Removed,
closeRenamed: closerConfig.OnStateChange.Renamed,
offset: offset,
lastTimeRead: time.Now(),
backoff: backoff.NewExpBackoff(canceler.Done(), config.Backoff.Init, config.Backoff.Max),
Expand Down Expand Up @@ -143,7 +151,7 @@ func (f *logFile) startFileMonitoringIfNeeded() {

if f.closeAfterInterval > 0 {
f.tg.Go(func(ctx unison.Canceler) error {
f.closeIfInactive(ctx)
f.periodicStateCheck(ctx)
return nil
})
}
Expand All @@ -164,26 +172,69 @@ func (f *logFile) closeIfTimeout(ctx unison.Canceler) {
}
}

func (f *logFile) closeIfInactive(ctx unison.Canceler) {
// This can be made configureble if users need a more flexible
// cheking for inactive files.
ticker := time.NewTicker(5 * time.Minute)
func (f *logFile) periodicStateCheck(ctx unison.Canceler) {
ticker := time.NewTicker(f.checkInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
age := time.Since(f.lastTimeRead)
if age > f.closeInactive {
if f.shouldBeClosed() {
f.cancelReading()
return
}
}
}
}

func (f *logFile) shouldBeClosed() bool {
if f.closeInactive > 0 {
if time.Since(f.lastTimeRead) > f.closeInactive {
return true
}
}

if !f.closeRemoved && !f.closeRenamed {
return false

}

info, statErr := f.file.Stat()
if statErr != nil {
f.log.Errorf("Unexpected error reading from %s; error: %s", f.file.Name(), statErr)
return true
}

if f.closeRenamed {
// Check if the file can still be found under the same path
if !isSameFile(f.file.Name(), info) {
f.log.Debugf("close_renamed is enabled and file %s has been renamed", f.file.Name())
return true
}
}

if f.closeRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
if file.IsRemoved(f.file) {
f.log.Debugf("close_removed is enabled and file %s has been removed", f.file.Name())
return true
}
}

return false
}

func isSameFile(path string, info os.FileInfo) bool {
fileInfo, err := os.Stat(path)
if err != nil {
return false
}

return os.SameFile(fileInfo, info)
}

// errorChecks determines the cause for EOF errors, and how the EOF event should be handled
// based on the config options.
func (f *logFile) errorChecks(err error) error {
Expand Down
Loading

0 comments on commit 40b24dd

Please sign in to comment.