From 1f97c697a49a8fcfcddb68f54c7fd9cc6b34ba1c Mon Sep 17 00:00:00 2001 From: Matthew Crenshaw Date: Fri, 5 Jul 2019 13:45:42 -0400 Subject: [PATCH 1/3] [tail/logparser] resume from last known offset when reloading * Fixes #3522 - Not able to read rotated log file without missing lines --- plugins/inputs/logparser/logparser.go | 55 +++++++++++++++++++++++---- plugins/inputs/tail/tail.go | 54 +++++++++++++++++++++----- 2 files changed, 92 insertions(+), 17 deletions(-) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index e724f2d4b6d3d..5782cd958b750 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -3,6 +3,7 @@ package logparser import ( + "fmt" "log" "strings" "sync" @@ -19,6 +20,11 @@ const ( defaultWatchMethod = "inotify" ) +var ( + offsets = make(map[string]int64) + offsetsMutex = new(sync.Mutex) +) + // LogParser in the primary interface for the plugin type GrokConfig struct { MeasurementName string `toml:"measurement"` @@ -161,23 +167,27 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { l.wg.Add(1) go l.parser() - return l.tailNewfiles(l.FromBeginning) + err = l.tailNewfiles(l.FromBeginning) + + // clear resume offsets after starting + offsetsMutex.Lock() + offsets = make(map[string]int64) + offsetsMutex.Unlock() + + return err } // check the globs against files on disk, and start tailing any new files. // Assumes l's lock is held! func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { - var seek tail.SeekInfo - if !fromBeginning { - seek.Whence = 2 - seek.Offset = 0 - } - var poll bool if l.WatchMethod == "poll" { poll = true } + offsetsMutex.Lock() + defer offsetsMutex.Unlock() + // Create a "tailer" for each file for _, filepath := range l.Files { g, err := globpath.Compile(filepath) @@ -193,11 +203,27 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { continue } + var seek *tail.SeekInfo + if !fromBeginning { + if offset, ok := offsets[file]; ok { + log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file) + seek = &tail.SeekInfo{ + Whence: 0, + Offset: offset, + } + } else { + seek = &tail.SeekInfo{ + Whence: 2, + Offset: 0, + } + } + } + tailer, err := tail.TailFile(file, tail.Config{ ReOpen: true, Follow: true, - Location: &seek, + Location: seek, MustExist: true, Poll: poll, Logger: tail.DiscardingLogger, @@ -285,7 +311,20 @@ func (l *LogParserPlugin) Stop() { l.Lock() defer l.Unlock() + offsetsMutex.Lock() + defer offsetsMutex.Unlock() + for _, t := range l.tailers { + if !l.FromBeginning { + // store offset for resume + offset, err := t.Tell() + if err == nil { + offsets[t.Filename] = offset + log.Printf("D! [inputs.logparser] recording offset %d for file: %v", offset, t.Filename) + } else { + l.acc.AddError(fmt.Errorf("E! Error recording offset for file %s\n", t.Filename)) + } + } err := t.Stop() //message for a stopped tailer diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 834d7cf8fafe5..693ee9e9cad93 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -19,6 +19,11 @@ const ( defaultWatchMethod = "inotify" ) +var ( + offsets = make(map[string]int64) + offsetsMutex = new(sync.Mutex) +) + type Tail struct { Files []string FromBeginning bool @@ -87,23 +92,25 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { t.acc = acc t.tailers = make(map[string]*tail.Tail) - return t.tailNewFiles(t.FromBeginning) + err := t.tailNewFiles(t.FromBeginning) + + // clear resume offsets after starting + offsetsMutex.Lock() + offsets = make(map[string]int64) + offsetsMutex.Unlock() + + return err } func (t *Tail) tailNewFiles(fromBeginning bool) error { - var seek *tail.SeekInfo - if !t.Pipe && !fromBeginning { - seek = &tail.SeekInfo{ - Whence: 2, - Offset: 0, - } - } - var poll bool if t.WatchMethod == "poll" { poll = true } + offsetsMutex.Lock() + defer offsetsMutex.Unlock() + // Create a "tailer" for each file for _, filepath := range t.Files { g, err := globpath.Compile(filepath) @@ -116,6 +123,22 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { continue } + var seek *tail.SeekInfo + if !t.Pipe && !fromBeginning { + if offset, ok := offsets[file]; ok { + log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file) + seek = &tail.SeekInfo{ + Whence: 0, + Offset: offset, + } + } else { + seek = &tail.SeekInfo{ + Whence: 2, + Offset: 0, + } + } + } + tailer, err := tail.TailFile(file, tail.Config{ ReOpen: true, @@ -205,7 +228,20 @@ func (t *Tail) Stop() { t.Lock() defer t.Unlock() + offsetsMutex.Lock() + defer offsetsMutex.Unlock() + for _, tailer := range t.tailers { + if !t.Pipe && !t.FromBeginning { + // store offset for resume + offset, err := tailer.Tell() + if err == nil { + offsets[tailer.Filename] = offset + log.Printf("D! [inputs.tail] recording offset %d for file: %v", offset, tailer.Filename) + } else { + t.acc.AddError(fmt.Errorf("E! Error recording offset for file %s\n", tailer.Filename)) + } + } err := tailer.Stop() if err != nil { t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename)) From 890921079aec9c90486fc457a87c4d907710c40f Mon Sep 17 00:00:00 2001 From: Matthew Crenshaw Date: Tue, 9 Jul 2019 18:16:19 -0400 Subject: [PATCH 2/3] better locking and logging --- plugins/inputs/logparser/logparser.go | 58 +++++++++++++++------------ plugins/inputs/tail/tail.go | 49 +++++++++++----------- plugins/inputs/tail/tail_test.go | 2 +- 3 files changed, 59 insertions(+), 50 deletions(-) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 5782cd958b750..56ff199848ea6 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/influxdata/tail" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" @@ -48,6 +49,7 @@ type LogParserPlugin struct { WatchMethod string tailers map[string]*tail.Tail + offsets map[string]int64 lines chan logEntry done chan struct{} wg sync.WaitGroup @@ -59,6 +61,20 @@ type LogParserPlugin struct { GrokConfig GrokConfig `toml:"grok"` } +func NewLogParser() *LogParserPlugin { + offsetsMutex.Lock() + offsetsCopy := make(map[string]int64, len(offsets)) + for k, v := range offsets { + offsetsCopy[k] = v + } + offsetsMutex.Unlock() + + return &LogParserPlugin{ + WatchMethod: defaultWatchMethod, + offsets: offsetsCopy, + } +} + const sampleConfig = ` ## Log files to parse. ## These accept standard unix glob matching rules, but with the addition of @@ -167,14 +183,7 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { l.wg.Add(1) go l.parser() - err = l.tailNewfiles(l.FromBeginning) - - // clear resume offsets after starting - offsetsMutex.Lock() - offsets = make(map[string]int64) - offsetsMutex.Unlock() - - return err + return l.tailNewfiles(l.FromBeginning) } // check the globs against files on disk, and start tailing any new files. @@ -185,14 +194,11 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { poll = true } - offsetsMutex.Lock() - defer offsetsMutex.Unlock() - // Create a "tailer" for each file for _, filepath := range l.Files { g, err := globpath.Compile(filepath) if err != nil { - log.Printf("E! Error Glob %s failed to compile, %s", filepath, err) + log.Printf("E! [inputs.logparser] Error Glob %s failed to compile, %s", filepath, err) continue } files := g.Match() @@ -205,7 +211,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error { var seek *tail.SeekInfo if !fromBeginning { - if offset, ok := offsets[file]; ok { + if offset, ok := l.offsets[file]; ok { log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file) seek = &tail.SeekInfo{ Whence: 0, @@ -254,7 +260,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) { for line = range tailer.Lines { if line.Err != nil { - log.Printf("E! Error tailing file %s, Error: %s\n", + log.Printf("E! [inputs.logparser] Error tailing file %s, Error: %s", tailer.Filename, line.Err) continue } @@ -300,7 +306,7 @@ func (l *LogParserPlugin) parser() { l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time()) } } else { - log.Println("E! Error parsing log line: " + err.Error()) + log.Println("E! [inputs.logparser] Error parsing log line: " + err.Error()) } } @@ -311,37 +317,39 @@ func (l *LogParserPlugin) Stop() { l.Lock() defer l.Unlock() - offsetsMutex.Lock() - defer offsetsMutex.Unlock() - for _, t := range l.tailers { if !l.FromBeginning { // store offset for resume offset, err := t.Tell() if err == nil { - offsets[t.Filename] = offset + l.offsets[t.Filename] = offset log.Printf("D! [inputs.logparser] recording offset %d for file: %v", offset, t.Filename) } else { - l.acc.AddError(fmt.Errorf("E! Error recording offset for file %s\n", t.Filename)) + l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename)) } } err := t.Stop() //message for a stopped tailer - log.Printf("D! tail dropped for file: %v", t.Filename) + log.Printf("D! [inputs.logparser] tail dropped for file: %v", t.Filename) if err != nil { - log.Printf("E! Error stopping tail on file %s\n", t.Filename) + log.Printf("E! [inputs.logparser] Error stopping tail on file %s", t.Filename) } } close(l.done) l.wg.Wait() + + // persist offsets + offsetsMutex.Lock() + for k, v := range l.offsets { + offsets[k] = v + } + offsetsMutex.Unlock() } func init() { inputs.Add("logparser", func() telegraf.Input { - return &LogParserPlugin{ - WatchMethod: defaultWatchMethod, - } + return NewLogParser() }) } diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 693ee9e9cad93..0f6125373dff6 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/influxdata/tail" + "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/globpath" "github.com/influxdata/telegraf/plugins/inputs" @@ -31,6 +32,7 @@ type Tail struct { WatchMethod string tailers map[string]*tail.Tail + offsets map[string]int64 parserFunc parsers.ParserFunc wg sync.WaitGroup acc telegraf.Accumulator @@ -39,8 +41,16 @@ type Tail struct { } func NewTail() *Tail { + offsetsMutex.Lock() + offsetsCopy := make(map[string]int64, len(offsets)) + for k, v := range offsets { + offsetsCopy[k] = v + } + offsetsMutex.Unlock() + return &Tail{ FromBeginning: false, + offsets: offsetsCopy, } } @@ -92,14 +102,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { t.acc = acc t.tailers = make(map[string]*tail.Tail) - err := t.tailNewFiles(t.FromBeginning) - - // clear resume offsets after starting - offsetsMutex.Lock() - offsets = make(map[string]int64) - offsetsMutex.Unlock() - - return err + return t.tailNewFiles(t.FromBeginning) } func (t *Tail) tailNewFiles(fromBeginning bool) error { @@ -108,14 +111,11 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { poll = true } - offsetsMutex.Lock() - defer offsetsMutex.Unlock() - // Create a "tailer" for each file for _, filepath := range t.Files { g, err := globpath.Compile(filepath) if err != nil { - t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err)) + t.acc.AddError(fmt.Errorf("glob %s failed to compile, %s", filepath, err)) } for _, file := range g.Match() { if _, ok := t.tailers[file]; ok { @@ -125,7 +125,7 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error { var seek *tail.SeekInfo if !t.Pipe && !fromBeginning { - if offset, ok := offsets[file]; ok { + if offset, ok := t.offsets[file]; ok { log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file) seek = &tail.SeekInfo{ Whence: 0, @@ -182,8 +182,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { var line *tail.Line for line = range tailer.Lines { if line.Err != nil { - t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n", - tailer.Filename, err)) + t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err)) continue } // Fix up files with Windows line endings. @@ -211,7 +210,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time()) } } else { - t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n", + t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s", tailer.Filename, line.Text, err)) } } @@ -219,8 +218,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) { log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename) if err := tailer.Err(); err != nil { - t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n", - tailer.Filename, err)) + t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err)) } } @@ -228,27 +226,30 @@ func (t *Tail) Stop() { t.Lock() defer t.Unlock() - offsetsMutex.Lock() - defer offsetsMutex.Unlock() - for _, tailer := range t.tailers { if !t.Pipe && !t.FromBeginning { // store offset for resume offset, err := tailer.Tell() if err == nil { - offsets[tailer.Filename] = offset log.Printf("D! [inputs.tail] recording offset %d for file: %v", offset, tailer.Filename) } else { - t.acc.AddError(fmt.Errorf("E! Error recording offset for file %s\n", tailer.Filename)) + t.acc.AddError(fmt.Errorf("error recording offset for file %s", tailer.Filename)) } } err := tailer.Stop() if err != nil { - t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename)) + t.acc.AddError(fmt.Errorf("error stopping tail on file %s", tailer.Filename)) } } t.wg.Wait() + + // persist offsets + offsetsMutex.Lock() + for k, v := range t.offsets { + offsets[k] = v + } + offsetsMutex.Unlock() } func (t *Tail) SetParserFunc(fn parsers.ParserFunc) { diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go index 06db2c17234b1..fb5e05a76d003 100644 --- a/plugins/inputs/tail/tail_test.go +++ b/plugins/inputs/tail/tail_test.go @@ -108,7 +108,7 @@ func TestTailBadLine(t *testing.T) { require.NoError(t, err) acc.WaitError(1) - assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line") + assert.Contains(t, acc.Errors[0].Error(), "malformed log line") } func TestTailDosLineendings(t *testing.T) { From 97d9a5a115f1e1ebe0bb03438a369d9e50011783 Mon Sep 17 00:00:00 2001 From: Matthew Crenshaw Date: Thu, 11 Jul 2019 12:45:38 -0400 Subject: [PATCH 3/3] clear offsets after use --- plugins/inputs/logparser/logparser.go | 11 ++++++++++- plugins/inputs/tail/tail.go | 11 ++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go index 56ff199848ea6..c132ba7a2ccb2 100644 --- a/plugins/inputs/logparser/logparser.go +++ b/plugins/inputs/logparser/logparser.go @@ -183,7 +183,16 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { l.wg.Add(1) go l.parser() - return l.tailNewfiles(l.FromBeginning) + err = l.tailNewfiles(l.FromBeginning) + + // clear offsets + l.offsets = make(map[string]int64) + // assumption that once Start is called, all parallel plugins have already been initialized + offsetsMutex.Lock() + offsets = make(map[string]int64) + offsetsMutex.Unlock() + + return err } // check the globs against files on disk, and start tailing any new files. diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go index 0f6125373dff6..245010764d68e 100644 --- a/plugins/inputs/tail/tail.go +++ b/plugins/inputs/tail/tail.go @@ -102,7 +102,16 @@ func (t *Tail) Start(acc telegraf.Accumulator) error { t.acc = acc t.tailers = make(map[string]*tail.Tail) - return t.tailNewFiles(t.FromBeginning) + err := t.tailNewFiles(t.FromBeginning) + + // clear offsets + t.offsets = make(map[string]int64) + // assumption that once Start is called, all parallel plugins have already been initialized + offsetsMutex.Lock() + offsets = make(map[string]int64) + offsetsMutex.Unlock() + + return err } func (t *Tail) tailNewFiles(fromBeginning bool) error {