Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Prospector to Update state without Harvester #1978

Merged
merged 1 commit into from
Jul 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package harvester
import (
"fmt"
"regexp"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
Expand All @@ -27,12 +26,11 @@ import (
)

type Harvester struct {
Path string /* the file path to harvest */
Config harvesterConfig
path string /* the file path to harvest */
config harvesterConfig
offset int64
State file.State
stateMutex sync.Mutex
SpoolerChan chan *input.FileEvent
state file.State
prospectorChan chan *input.FileEvent
encoding encoding.EncodingFactory
file source.FileSource /* the file being watched */
ExcludeLinesRegexp []*regexp.Regexp
Expand All @@ -44,42 +42,42 @@ func NewHarvester(
cfg *common.Config,
path string,
state file.State,
spooler chan *input.FileEvent,
prospectorChan chan *input.FileEvent,
offset int64,
done chan struct{},
) (*Harvester, error) {

h := &Harvester{
Path: path,
Config: defaultConfig,
State: state,
SpoolerChan: spooler,
offset: offset,
done: done,
path: path,
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
offset: offset,
done: done,
}

if err := cfg.Unpack(&h.Config); err != nil {
if err := cfg.Unpack(&h.config); err != nil {
return nil, err
}
if err := h.Config.Validate(); err != nil {
if err := h.config.Validate(); err != nil {
return nil, err
}

encoding, ok := encoding.FindEncoding(h.Config.Encoding)
encoding, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encoding == nil {
return nil, fmt.Errorf("unknown encoding('%v')", h.Config.Encoding)
return nil, fmt.Errorf("unknown encoding('%v')", h.config.Encoding)
}
h.encoding = encoding

h.ExcludeLinesRegexp = h.Config.ExcludeLines
h.IncludeLinesRegexp = h.Config.IncludeLines
h.ExcludeLinesRegexp = h.config.ExcludeLines
h.IncludeLinesRegexp = h.config.IncludeLines
return h, nil
}

// open does open the file given under h.Path and assigns the file handler to h.file
func (h *Harvester) open() (encoding.Encoding, error) {

switch h.Config.InputType {
switch h.config.InputType {
case config.StdinInputType:
return h.openStdin()
case config.LogInputType:
Expand Down
4 changes: 2 additions & 2 deletions filebeat/harvester/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
func TestExampleTest(t *testing.T) {

h := Harvester{
Path: "/var/log/",
path: "/var/log/",
offset: 0,
}

assert.Equal(t, "/var/log/", h.Path)
assert.Equal(t, "/var/log/", h.path)
}
83 changes: 41 additions & 42 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ func (h *Harvester) Harvest() {
// Makes sure file is properly closed when the harvester is stopped
defer h.close()

h.State.Finished = false
h.state.Finished = false

enc, err := h.open()
if err != nil {
logp.Err("Stop Harvesting. Unexpected file opening error: %s", err)
return
}

logp.Info("Harvester started for file: %s", h.Path)
logp.Info("Harvester started for file: %s", h.path)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
cfg := h.Config
cfg := h.config
readerConfig := reader.LogFileReaderConfig{
CloseRemoved: cfg.CloseRemoved,
CloseRenamed: cfg.CloseRenamed,
Expand All @@ -57,7 +57,7 @@ func (h *Harvester) Harvest() {
}

// Always report the state before starting a harvester
if !h.SendStateUpdate() {
if !h.sendStateUpdate() {
return
}

Expand All @@ -74,14 +74,14 @@ func (h *Harvester) Harvest() {
if err != nil {
switch err {
case reader.ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.path)
h.SetOffset(0)
case reader.ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path)
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.path)
case reader.ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.Path)
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.path)
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.Path)
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.path)
default:
logp.Info("Read line error: %s", err)
}
Expand All @@ -94,7 +94,6 @@ func (h *Harvester) Harvest() {
event := h.createEvent()

if h.shouldExportLine(text) {

event.ReadTime = ts
event.Bytes = bytesRead
event.Text = &text
Expand All @@ -113,19 +112,17 @@ func (h *Harvester) Harvest() {
// of a harvester
func (h *Harvester) createEvent() *input.FileEvent {
event := &input.FileEvent{
EventMetadata: h.Config.EventMetadata,
Source: h.Path,
InputType: h.Config.InputType,
DocumentType: h.Config.DocumentType,
EventMetadata: h.config.EventMetadata,
Source: h.path,
InputType: h.config.InputType,
DocumentType: h.config.DocumentType,
Offset: h.getOffset(),
Bytes: 0,
Fileinfo: h.State.Fileinfo,
JSONConfig: h.Config.JSON,
Fileinfo: h.state.Fileinfo,
JSONConfig: h.config.JSON,
State: h.getState(),
}

if h.Config.InputType != config.StdinInputType {
event.FileState = h.GetState()
}
return event
}

Expand All @@ -135,7 +132,7 @@ func (h *Harvester) sendEvent(event *input.FileEvent) bool {
select {
case <-h.done:
return false
case h.SpoolerChan <- event: // ship the new event downstream
case h.prospectorChan <- event: // ship the new event downstream
return true
}
}
Expand Down Expand Up @@ -169,9 +166,9 @@ func (h *Harvester) shouldExportLine(line string) bool {
func (h *Harvester) openFile() (encoding.Encoding, error) {
var encoding encoding.Encoding

f, err := file.ReadOpen(h.Path)
f, err := file.ReadOpen(h.path)
if err != nil {
logp.Err("Failed opening %s: %s", h.Path, err)
logp.Err("Failed opening %s: %s", h.path, err)
return nil, err
}

Expand All @@ -182,11 +179,11 @@ func (h *Harvester) openFile() (encoding.Encoding, error) {

info, err := f.Stat()
if err != nil {
logp.Err("Failed getting stats for file %s: %s", h.Path, err)
logp.Err("Failed getting stats for file %s: %s", h.path, err)
return nil, err
}
// Compares the stat of the opened file to the state given by the prospector. Abort if not match.
if !os.SameFile(h.State.Fileinfo, info) {
if !os.SameFile(h.state.Fileinfo, info) {
return nil, errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.")
}

Expand Down Expand Up @@ -219,20 +216,20 @@ func (h *Harvester) initFileOffset(file *os.File) error {
// continue from last known offset

logp.Debug("harvester",
"harvest: %q position:%d (offset snapshot:%d)", h.Path, h.getOffset(), offset)
"harvest: %q position:%d (offset snapshot:%d)", h.path, h.getOffset(), offset)
_, err = file.Seek(h.getOffset(), os.SEEK_SET)
} else if h.Config.TailFiles {
} else if h.config.TailFiles {
// tail file if file is new and tail_files config is set

logp.Debug("harvester",
"harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset)
"harvest: (tailing) %q (offset snapshot:%d)", h.path, offset)
offset, err = file.Seek(0, os.SEEK_END)
h.SetOffset(offset)

} else {
// get offset from file in case of encoding factory was
// required to read some data.
logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset)
logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.path, offset)
h.SetOffset(offset)
}

Expand All @@ -251,43 +248,45 @@ func (h *Harvester) updateOffset(increment int64) {
h.offset += increment
}

// SendStateUpdate send an empty event with the current state to update the registry
func (h *Harvester) SendStateUpdate() bool {
logp.Debug("harvester", "Update state: %s, offset: %v", h.Path, h.offset)
// sendStateUpdate send an empty event with the current state to update the registry
func (h *Harvester) sendStateUpdate() bool {
logp.Debug("harvester", "Update state: %s, offset: %v", h.path, h.offset)
return h.sendEvent(h.createEvent())
}

func (h *Harvester) GetState() file.State {
h.stateMutex.Lock()
defer h.stateMutex.Unlock()
func (h *Harvester) getState() file.State {

if h.config.InputType == config.StdinInputType {
return file.State{}
}

h.refreshState()
return h.State
return h.state
}

// refreshState refreshes the values in State with the values from the harvester itself
func (h *Harvester) refreshState() {
h.State.Source = h.Path
h.State.Offset = h.getOffset()
h.State.FileStateOS = file.GetOSState(h.State.Fileinfo)
h.state.Source = h.path
h.state.Offset = h.getOffset()
h.state.FileStateOS = file.GetOSState(h.state.Fileinfo)
}

func (h *Harvester) close() {
// Mark harvester as finished
h.State.Finished = true
h.state.Finished = true

// On completion, push offset so we can continue where we left off if we relaunch on the same file
h.SendStateUpdate()
h.sendStateUpdate()

logp.Debug("harvester", "Stopping harvester for file: %s", h.Path)
logp.Debug("harvester", "Stopping harvester for file: %s", h.path)

// Make sure file is closed as soon as harvester exits
// If file was never opened, it can't be closed
if h.file != nil {
h.file.Close()
logp.Debug("harvester", "Stopping harvester, closing file: %s", h.Path)
logp.Debug("harvester", "Stopping harvester, closing file: %s", h.path)
} else {
logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.Path)
logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.path)
}
}

Expand Down
8 changes: 7 additions & 1 deletion filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ type FileEvent struct {
Fileinfo os.FileInfo
JSONFields common.MapStr
JSONConfig *processor.JSONConfig
FileState file.State
State file.State
}

func NewEvent(state file.State) *FileEvent {
return &FileEvent{
State: state,
}
}

func (f *FileEvent) ToMapStr() common.MapStr {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ func (p *Prospector) Run() {
case event := <-p.harvesterChan:
// Add ttl if cleanOlder is enabled
if p.config.CleanOlder > 0 {
event.FileState.TTL = p.config.CleanOlder
event.State.TTL = p.config.CleanOlder
}
select {
case <-p.done:
logp.Info("Prospector channel stopped")
return
case p.spoolerChan <- event:
p.states.Update(event.FileState)
p.states.Update(event.State)
}
}
}
Expand Down
19 changes: 7 additions & 12 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"time"

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)

type ProspectorLog struct {
Prospector *Prospector
config prospectorConfig
lastScan time.Time
lastClean time.Time
}

Expand Down Expand Up @@ -65,8 +65,8 @@ func (p *ProspectorLog) Run() {
_, err := os.Stat(state.Source)
if err != nil {
state.TTL = 0
h, _ := p.Prospector.createHarvester(state)
h.SendStateUpdate()
event := input.NewEvent(state)
p.Prospector.harvesterChan <- event
logp.Debug("prospector", "Cleanup state for file as file removed: %s", state.Source)
}
}
Expand Down Expand Up @@ -123,8 +123,6 @@ func (p *ProspectorLog) getFiles() map[string]os.FileInfo {
// Scan starts a scanGlob for each provided path/glob
func (p *ProspectorLog) scan() {

newLastScan := time.Now()

// TODO: Track harvesters to prevent any file from being harvested twice. Finished state could be delayed?
// Now let's do one quick scan to pick up new files
for f, fileinfo := range p.getFiles() {
Expand All @@ -144,9 +142,6 @@ func (p *ProspectorLog) scan() {
p.harvestExistingFile(newState, lastState)
}
}

// Only update lastScan timestamp after scan is completed
p.lastScan = newLastScan
}

// harvestNewFile harvest a new file
Expand Down Expand Up @@ -182,11 +177,11 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
// or no new lines were detected. It sends only an event status update to make sure the new name is persisted.
logp.Debug("prospector", "File rename was detected, updating state: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset)

h, _ := p.Prospector.createHarvester(newState)
h.SetOffset(oldState.Offset)

// Update state because of file rotation
h.SendStateUpdate()
newState.Offset = oldState.Offset
event := input.NewEvent(newState)
p.Prospector.harvesterChan <- event

} else {
// TODO: improve logging depedent on what the exact reason is that harvesting does not continue
// Nothing to do. Harvester is still running and file was not renamed
Expand Down
Loading