Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tail/logparser] resume from last known offset when reloading #6074

Merged
merged 3 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 72 additions & 16 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
package logparser

import (
"fmt"
"log"
"strings"
"sync"

"github.com/influxdata/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
Expand All @@ -19,6 +21,11 @@ const (
defaultWatchMethod = "inotify"
)

var (
offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex)
)

// LogParser in the primary interface for the plugin
type GrokConfig struct {
MeasurementName string `toml:"measurement"`
Expand All @@ -42,6 +49,7 @@ type LogParserPlugin struct {
WatchMethod string

tailers map[string]*tail.Tail
offsets map[string]int64
lines chan logEntry
done chan struct{}
wg sync.WaitGroup
Expand All @@ -53,6 +61,20 @@ type LogParserPlugin struct {
GrokConfig GrokConfig `toml:"grok"`
}

func NewLogParser() *LogParserPlugin {
offsetsMutex.Lock()
offsetsCopy := make(map[string]int64, len(offsets))
for k, v := range offsets {
offsetsCopy[k] = v
}
offsetsMutex.Unlock()

return &LogParserPlugin{
WatchMethod: defaultWatchMethod,
offsets: offsetsCopy,
}
}

const sampleConfig = `
## Log files to parse.
## These accept standard unix glob matching rules, but with the addition of
Expand Down Expand Up @@ -161,18 +183,21 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1)
go l.parser()

return l.tailNewfiles(l.FromBeginning)
err = l.tailNewfiles(l.FromBeginning)

// clear offsets
l.offsets = make(map[string]int64)
// assumption that once Start is called, all parallel plugins have already been initialized
offsetsMutex.Lock()
offsets = make(map[string]int64)
offsetsMutex.Unlock()

return err
}

// check the globs against files on disk, and start tailing any new files.
// Assumes l's lock is held!
func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
var seek tail.SeekInfo
if !fromBeginning {
seek.Whence = 2
seek.Offset = 0
}

var poll bool
if l.WatchMethod == "poll" {
poll = true
Expand All @@ -182,7 +207,7 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
for _, filepath := range l.Files {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
log.Printf("E! [inputs.logparser] Error Glob %s failed to compile, %s", filepath, err)
continue
}
files := g.Match()
Expand All @@ -193,11 +218,27 @@ func (l *LogParserPlugin) tailNewfiles(fromBeginning bool) error {
continue
}

var seek *tail.SeekInfo
if !fromBeginning {
if offset, ok := l.offsets[file]; ok {
log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
Location: seek,
MustExist: true,
Poll: poll,
Logger: tail.DiscardingLogger,
Expand Down Expand Up @@ -228,7 +269,7 @@ func (l *LogParserPlugin) receiver(tailer *tail.Tail) {
for line = range tailer.Lines {

if line.Err != nil {
log.Printf("E! Error tailing file %s, Error: %s\n",
log.Printf("E! [inputs.logparser] Error tailing file %s, Error: %s",
tailer.Filename, line.Err)
continue
}
Expand Down Expand Up @@ -274,7 +315,7 @@ func (l *LogParserPlugin) parser() {
l.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
log.Println("E! Error parsing log line: " + err.Error())
log.Println("E! [inputs.logparser] Error parsing log line: " + err.Error())
}

}
Expand All @@ -286,23 +327,38 @@ func (l *LogParserPlugin) Stop() {
defer l.Unlock()

for _, t := range l.tailers {
if !l.FromBeginning {
// store offset for resume
offset, err := t.Tell()
if err == nil {
l.offsets[t.Filename] = offset
log.Printf("D! [inputs.logparser] recording offset %d for file: %v", offset, t.Filename)
} else {
l.acc.AddError(fmt.Errorf("error recording offset for file %s", t.Filename))
}
}
err := t.Stop()

//message for a stopped tailer
log.Printf("D! tail dropped for file: %v", t.Filename)
log.Printf("D! [inputs.logparser] tail dropped for file: %v", t.Filename)

if err != nil {
log.Printf("E! Error stopping tail on file %s\n", t.Filename)
log.Printf("E! [inputs.logparser] Error stopping tail on file %s", t.Filename)
}
}
close(l.done)
l.wg.Wait()

// persist offsets
offsetsMutex.Lock()
for k, v := range l.offsets {
offsets[k] = v
}
offsetsMutex.Unlock()
}

func init() {
inputs.Add("logparser", func() telegraf.Input {
return &LogParserPlugin{
WatchMethod: defaultWatchMethod,
}
return NewLogParser()
})
}
78 changes: 62 additions & 16 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"

"github.com/influxdata/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"
Expand All @@ -19,13 +20,19 @@ const (
defaultWatchMethod = "inotify"
)

var (
offsets = make(map[string]int64)
offsetsMutex = new(sync.Mutex)
)

type Tail struct {
Files []string
FromBeginning bool
Pipe bool
WatchMethod string

tailers map[string]*tail.Tail
offsets map[string]int64
parserFunc parsers.ParserFunc
wg sync.WaitGroup
acc telegraf.Accumulator
Expand All @@ -34,8 +41,16 @@ type Tail struct {
}

func NewTail() *Tail {
offsetsMutex.Lock()
offsetsCopy := make(map[string]int64, len(offsets))
for k, v := range offsets {
offsetsCopy[k] = v
}
offsetsMutex.Unlock()

return &Tail{
FromBeginning: false,
offsets: offsetsCopy,
}
}

Expand Down Expand Up @@ -87,18 +102,19 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
t.acc = acc
t.tailers = make(map[string]*tail.Tail)

return t.tailNewFiles(t.FromBeginning)
err := t.tailNewFiles(t.FromBeginning)

// clear offsets
t.offsets = make(map[string]int64)
// assumption that once Start is called, all parallel plugins have already been initialized
offsetsMutex.Lock()
offsets = make(map[string]int64)
offsetsMutex.Unlock()
sgtsquiggs marked this conversation as resolved.
Show resolved Hide resolved

return err
}

func (t *Tail) tailNewFiles(fromBeginning bool) error {
var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}

var poll bool
if t.WatchMethod == "poll" {
poll = true
Expand All @@ -108,14 +124,30 @@ func (t *Tail) tailNewFiles(fromBeginning bool) error {
for _, filepath := range t.Files {
g, err := globpath.Compile(filepath)
if err != nil {
t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
t.acc.AddError(fmt.Errorf("glob %s failed to compile, %s", filepath, err))
}
for _, file := range g.Match() {
if _, ok := t.tailers[file]; ok {
// we're already tailing this file
continue
}

var seek *tail.SeekInfo
if !t.Pipe && !fromBeginning {
if offset, ok := t.offsets[file]; ok {
log.Printf("D! [inputs.tail] using offset %d for file: %v", offset, file)
seek = &tail.SeekInfo{
Whence: 0,
Offset: offset,
}
} else {
seek = &tail.SeekInfo{
Whence: 2,
Offset: 0,
}
}
}

tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Expand Down Expand Up @@ -159,8 +191,7 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
var line *tail.Line
for line = range tailer.Lines {
if line.Err != nil {
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err))
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err))
continue
}
// Fix up files with Windows line endings.
Expand Down Expand Up @@ -188,16 +219,15 @@ func (t *Tail) receiver(parser parsers.Parser, tailer *tail.Tail) {
t.acc.AddFields(m.Name(), m.Fields(), tags, m.Time())
}
} else {
t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
t.acc.AddError(fmt.Errorf("malformed log line in %s: [%s], Error: %s",
tailer.Filename, line.Text, err))
}
}

log.Printf("D! [inputs.tail] tail removed for file: %v", tailer.Filename)

if err := tailer.Err(); err != nil {
t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
tailer.Filename, err))
t.acc.AddError(fmt.Errorf("error tailing file %s, Error: %s", tailer.Filename, err))
}
}

Expand All @@ -206,13 +236,29 @@ func (t *Tail) Stop() {
defer t.Unlock()

for _, tailer := range t.tailers {
if !t.Pipe && !t.FromBeginning {
// store offset for resume
offset, err := tailer.Tell()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to call this after the tailer has Stopped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, but if you stop first Tell returns an offset of 0.

when tailFileSync() finishes, it calls tail.close() which calls tail.closeFile() which sets tailer.file = nil, which causes Tell() to return an zero value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly unfortunate, but worse case we reprocess some lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An improvement to the tail lib could resolve this

if err == nil {
log.Printf("D! [inputs.tail] recording offset %d for file: %v", offset, tailer.Filename)
} else {
t.acc.AddError(fmt.Errorf("error recording offset for file %s", tailer.Filename))
}
}
err := tailer.Stop()
if err != nil {
t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
t.acc.AddError(fmt.Errorf("error stopping tail on file %s", tailer.Filename))
}
}

t.wg.Wait()

// persist offsets
offsetsMutex.Lock()
for k, v := range t.offsets {
offsets[k] = v
}
offsetsMutex.Unlock()
}

func (t *Tail) SetParserFunc(fn parsers.ParserFunc) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/tail/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestTailBadLine(t *testing.T) {
require.NoError(t, err)

acc.WaitError(1)
assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
assert.Contains(t, acc.Errors[0].Error(), "malformed log line")
}

func TestTailDosLineendings(t *testing.T) {
Expand Down