Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce close_eof harvster option #1914

Merged
merged 1 commit into from
Jun 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Introdce close_removed and close_renamed harvester options {issue}1600[1600]
- Introduce close_removed and close_renamed harvester options {issue}1600[1600]
- Introduce close_eof harvester options {issue}1600[1600]


*Winlogbeat*
Expand Down
5 changes: 5 additions & 0 deletions filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ filebeat.prospectors:
# Note: Potential data loss if file reading was not finished when file was removed.
#close_removed: false

# Closes the file handler as soon as the harvesters reaches then end of the file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin needs rephrase: Closes the file handler as soon as the harvester reaches end of the file

# The file will be picked up again by the harvester at previous known state
# after scan_frequency in case the file can still be discovered by the prospector.
#close_eof: false

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ filebeat.prospectors:
# Note: Potential data loss if file reading was not finished when file was removed.
#close_removed: false

# Closes the file handler as soon as the harvesters reaches then end of the file.
# The file will be picked up again by the harvester at previous known state
# after scan_frequency in case the file can still be discovered by the prospector.
#close_eof: false

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin
Expand Down
4 changes: 3 additions & 1 deletion filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
MaxBytes: 10 * (1 << 20), // 10MB
CloseRemoved: false,
CloseRenamed: false,
CloseEOF: false,
ForceCloseFiles: false,
}
)
Expand All @@ -43,6 +44,7 @@ type harvesterConfig struct {
CloseOlder time.Duration `config:"close_older"`
CloseRemoved bool `config:"close_removed"`
CloseRenamed bool `config:"close_renamed"`
CloseEOF bool `config:"close_eof"`
ForceCloseFiles bool `config:"force_close_files"`
ExcludeLines []*regexp.Regexp `config:"exclude_lines"`
IncludeLines []*regexp.Regexp `config:"include_lines"`
Expand All @@ -53,7 +55,7 @@ type harvesterConfig struct {

func (config *harvesterConfig) Validate() error {

// TODO: remove in 7.0
// DEPRECATED: remove in 6.0
if config.ForceCloseFiles {
config.CloseRemoved = true
config.CloseRenamed = true
Expand Down
26 changes: 12 additions & 14 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"golang.org/x/text/transform"

"io"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/processor"
Expand Down Expand Up @@ -40,6 +42,7 @@ func (h *Harvester) Harvest() {
CloseRemoved: cfg.CloseRemoved,
CloseRenamed: cfg.CloseRenamed,
CloseOlder: cfg.CloseOlder,
CloseEOF: cfg.CloseEOF,
BackoffDuration: cfg.Backoff,
MaxBackoffDuration: cfg.MaxBackoff,
BackoffFactor: cfg.BackoffFactor,
Expand Down Expand Up @@ -69,24 +72,19 @@ func (h *Harvester) Harvest() {
// Partial lines return error and are only read on completion
ts, text, bytesRead, jsonFields, err := readLine(processor)
if err != nil {

if err == reader.ErrFileTruncate {
switch err {
case reader.ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
h.SetOffset(0)
return
}

if err == reader.ErrRemoved {
case reader.ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path)
return
case reader.ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.Path)
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.Path)
default:
logp.Info("Read line error: %s", err)
}

if err == reader.ErrRenamed {
logp.Info("File was renamed: %s Closing because close_renamed is enabled.", h.Path)
return
}

logp.Info("Read line error: %s", err)
return
}

Expand Down
8 changes: 4 additions & 4 deletions filebeat/harvester/reader/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type LogFileReaderConfig struct {
BackoffFactor int
CloseRenamed bool
CloseRemoved bool
CloseEOF bool
}

func NewLogFileReader(
Expand Down Expand Up @@ -81,13 +82,12 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
return n, nil
}

continuable := r.fs.Continuable()
if err == io.EOF && !continuable {
logp.Info("Reached end of file: %s", r.fs.Name())
if err == io.EOF && r.config.CloseEOF {
return n, err
}

if err != io.EOF || !continuable {
// Stdin is not continuable
if err != io.EOF || !r.fs.Continuable() {
logp.Err("Unexpected state reading from %s; error: %s", r.fs.Name(), err)
return n, err
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ filebeat.prospectors:
max_backoff: 0.1s
close_removed: {{close_removed}}
close_renamed: {{close_renamed}}
close_eof: {{close_eof}}

{% if fields %}
fields:
Expand Down
42 changes: 42 additions & 0 deletions filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,45 @@ def test_close_removed(self):

# Make sure the state for the file was persisted
assert len(data) == 1


def test_close_eof(self):
"""
Checks that a file is closed if eof is reached
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
close_eof="true",
scan_frequency="0.1s"
)
os.mkdir(self.working_dir + "/log/")

testfile1 = self.working_dir + "/log/test.log"
file = open(testfile1, 'w')

iterations1 = 5
for n in range(0, iterations1):
file.write("rotation file")
file.write("\n")

file.close()

filebeat = self.start_beat()

# Let it read the file
self.wait_until(
lambda: self.output_has(lines=iterations1), max_timeout=10)


# Wait until error shows up on windows
self.wait_until(
lambda: self.log_contains(
"Closing because close_eof is enabled"),
max_timeout=15)

filebeat.check_kill_and_wait()

data = self.get_registry()

# Make sure the state for the file was persisted
assert len(data) == 1