Skip to content

Commit

Permalink
Merge pull request #1914 from ruflin/fb-close-eof
Browse files Browse the repository at this point in the history
Introduce close_eof harvster option
  • Loading branch information
Steffen Siering authored Jun 28, 2016
2 parents 1a5be3c + 1cc46b6 commit ee38038
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 20 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Introdce close_removed and close_renamed harvester options {issue}1600[1600]
- Introduce close_removed and close_renamed harvester options {issue}1600[1600]
- Introduce close_eof harvester options {issue}1600[1600]


*Winlogbeat*
Expand Down
5 changes: 5 additions & 0 deletions filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ filebeat.prospectors:
# Note: Potential data loss if file reading was not finished when file was removed.
#close_removed: false

# Closes the file handler as soon as the harvesters reaches then end of the file.
# The file will be picked up again by the harvester at previous known state
# after scan_frequency in case the file can still be discovered by the prospector.
#close_eof: false

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ filebeat.prospectors:
# Note: Potential data loss if file reading was not finished when file was removed.
#close_removed: false

# Closes the file handler as soon as the harvesters reaches then end of the file.
# The file will be picked up again by the harvester at previous known state
# after scan_frequency in case the file can still be discovered by the prospector.
#close_eof: false

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin
Expand Down
4 changes: 3 additions & 1 deletion filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
MaxBytes: 10 * (1 << 20), // 10MB
CloseRemoved: false,
CloseRenamed: false,
CloseEOF: false,
ForceCloseFiles: false,
}
)
Expand All @@ -43,6 +44,7 @@ type harvesterConfig struct {
CloseOlder time.Duration `config:"close_older"`
CloseRemoved bool `config:"close_removed"`
CloseRenamed bool `config:"close_renamed"`
CloseEOF bool `config:"close_eof"`
ForceCloseFiles bool `config:"force_close_files"`
ExcludeLines []*regexp.Regexp `config:"exclude_lines"`
IncludeLines []*regexp.Regexp `config:"include_lines"`
Expand All @@ -53,7 +55,7 @@ type harvesterConfig struct {

func (config *harvesterConfig) Validate() error {

// TODO: remove in 7.0
// DEPRECATED: remove in 6.0
if config.ForceCloseFiles {
config.CloseRemoved = true
config.CloseRenamed = true
Expand Down
26 changes: 12 additions & 14 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"golang.org/x/text/transform"

"io"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/processor"
Expand Down Expand Up @@ -40,6 +42,7 @@ func (h *Harvester) Harvest() {
CloseRemoved: cfg.CloseRemoved,
CloseRenamed: cfg.CloseRenamed,
CloseOlder: cfg.CloseOlder,
CloseEOF: cfg.CloseEOF,
BackoffDuration: cfg.Backoff,
MaxBackoffDuration: cfg.MaxBackoff,
BackoffFactor: cfg.BackoffFactor,
Expand Down Expand Up @@ -69,24 +72,19 @@ func (h *Harvester) Harvest() {
// Partial lines return error and are only read on completion
ts, text, bytesRead, jsonFields, err := readLine(processor)
if err != nil {

if err == reader.ErrFileTruncate {
switch err {
case reader.ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
h.SetOffset(0)
return
}

if err == reader.ErrRemoved {
case reader.ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path)
return
case reader.ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.Path)
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.Path)
default:
logp.Info("Read line error: %s", err)
}

if err == reader.ErrRenamed {
logp.Info("File was renamed: %s Closing because close_renamed is enabled.", h.Path)
return
}

logp.Info("Read line error: %s", err)
return
}

Expand Down
8 changes: 4 additions & 4 deletions filebeat/harvester/reader/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type LogFileReaderConfig struct {
BackoffFactor int
CloseRenamed bool
CloseRemoved bool
CloseEOF bool
}

func NewLogFileReader(
Expand Down Expand Up @@ -81,13 +82,12 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
return n, nil
}

continuable := r.fs.Continuable()
if err == io.EOF && !continuable {
logp.Info("Reached end of file: %s", r.fs.Name())
if err == io.EOF && r.config.CloseEOF {
return n, err
}

if err != io.EOF || !continuable {
// Stdin is not continuable
if err != io.EOF || !r.fs.Continuable() {
logp.Err("Unexpected state reading from %s; error: %s", r.fs.Name(), err)
return n, err
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ filebeat.prospectors:
max_backoff: 0.1s
close_removed: {{close_removed}}
close_renamed: {{close_renamed}}
close_eof: {{close_eof}}

{% if fields %}
fields:
Expand Down
42 changes: 42 additions & 0 deletions filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,45 @@ def test_close_removed(self):

# Make sure the state for the file was persisted
assert len(data) == 1


def test_close_eof(self):
"""
Checks that a file is closed if eof is reached
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
close_eof="true",
scan_frequency="0.1s"
)
os.mkdir(self.working_dir + "/log/")

testfile1 = self.working_dir + "/log/test.log"
file = open(testfile1, 'w')

iterations1 = 5
for n in range(0, iterations1):
file.write("rotation file")
file.write("\n")

file.close()

filebeat = self.start_beat()

# Let it read the file
self.wait_until(
lambda: self.output_has(lines=iterations1), max_timeout=10)


# Wait until error shows up on windows
self.wait_until(
lambda: self.log_contains(
"Closing because close_eof is enabled"),
max_timeout=15)

filebeat.check_kill_and_wait()

data = self.get_registry()

# Make sure the state for the file was persisted
assert len(data) == 1

0 comments on commit ee38038

Please sign in to comment.