Skip to content

Commit

Permalink
Add close_timeout option (#1926)
Browse files Browse the repository at this point in the history
close_timeout will end the harvester after the predefined time. In case the output is blocked, close_timeout will only apply on the next event sent. This is identical with the close_* options.
  • Loading branch information
ruflin authored and Steffen Siering committed Aug 31, 2016
1 parent e2b7df8 commit 001b29b
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Introduce close_timeout harvester options {issue}1600[1600]


- Add harvester_limit option {pull}2417[2417]

Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/reference/configuration/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ WARNING: Only use this options if you understand that data loss is a potential s

Close eof closes a file as soon as the end of a file is reached. This is useful in case your files are only written once and not updated from time to time. This case can happen in case you are writing every single log event to a new file.

===== close_timeout

WARNING: Only use this options if you understand the potential side affects with potential data loss. In addition it can happen that multiline events are not sent completely on timeout.

Close timeout gives every harvester a predefined lifetime. Independent of the location of the reader, it will stop the reader after `close_timeout`. This option can be useful, if only a predefine time should be spent on older log files. Using this option in combination with `ignore_older` == `close_timeout` means the file is not picked up again in case it wasn't modified in between. This normally leads to data loss and not the complete file is sent.

In case close_timeout is used in combination with multiline events, it can happen that the harvester will be stopped in the middle of a multiline event, means only parts of the event will be sent. In case the harvester is continued at a later stage again and the file still exists, only the second part of the event will be sent.

Close timeout will not apply, in case your output is stuck and no further events can be sent. At least one event must be sent after close_timeout kicks in so the harvester can be closed afterwards.


[[clean-options]]
===== clean_*

Expand Down
3 changes: 2 additions & 1 deletion filebeat/docs/troubleshooting.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ There are 4 more configuration options which can be used to close file handlers,
* close_renamed
* close_removed
* close_eof
* close_timeout

`close_renamed` and `close_removed` can be useful on Windows and issues related to file rotation, see <<windows-file-rotation>>. `close_eof` can be useful in environments with a large number of files with only very few entries. More details can be found in config options, see <<configuration-filebeat-options>>.
`close_renamed` and `close_removed` can be useful on Windows and issues related to file rotation, see <<windows-file-rotation>>. `close_eof` can be useful in environments with a large number of files with only very few entries. `close_timeout` in environments where it is more important to close file handlers then to send all log lines. More details can be found in config options, see <<configuration-filebeat-options>>.

Before using any of these variables, make sure to study the documentation on each.

Expand Down
5 changes: 5 additions & 0 deletions filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ filebeat.prospectors:
# Removes the state for file which cannot be found on disk anymore immediately
#clean_removed: false

# Close timeout closes the harvester after the predefined time.
# This is independent if the harvester did finish reading the file or not.
# By default this option is disabled.
# Note: Potential data loss. Make sure to read and understand the docs for this option.
#close_timeout: 0

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ filebeat.prospectors:
# Removes the state for file which cannot be found on disk anymore immediately
#clean_removed: false

# Close timeout closes the harvester after the predefined time.
# This is independent if the harvester did finish reading the file or not.
# By default this option is disabled.
# Note: Potential data loss. Make sure to read and understand the docs for this option.
#close_timeout: 0

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
Expand Down
2 changes: 2 additions & 0 deletions filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
CloseRemoved: false,
CloseRenamed: false,
CloseEOF: false,
CloseTimeout: 0,
ForceCloseFiles: false,
}
)
Expand All @@ -46,6 +47,7 @@ type harvesterConfig struct {
CloseRemoved bool `config:"close_removed"`
CloseRenamed bool `config:"close_renamed"`
CloseEOF bool `config:"close_eof"`
CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"`
ForceCloseFiles bool `config:"force_close_files"`
ExcludeLines []*regexp.Regexp `config:"exclude_lines"`
IncludeLines []*regexp.Regexp `config:"include_lines"`
Expand Down
3 changes: 2 additions & 1 deletion filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type Harvester struct {
state file.State
prospectorChan chan *input.Event
file source.FileSource /* the file being watched */
done chan struct{}
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
done chan struct{}
}

func NewHarvester(
Expand Down
21 changes: 19 additions & 2 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"expvar"
"io"
"os"
"time"

"golang.org/x/text/transform"

Expand Down Expand Up @@ -260,6 +261,7 @@ func (h *Harvester) close() {
if h.file != nil {

h.file.Close()

logp.Debug("harvester", "Closing file: %s", h.state.Source)
harvesterOpenFiles.Add(-1)

Expand Down Expand Up @@ -294,12 +296,27 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
fileReader, err := NewLogFile(h.file, h.config, h.done)
h.fileReader, err = NewLogFile(h.file, h.config)
if err != nil {
return nil, err
}

r, err = reader.NewEncode(fileReader, h.encoding, h.config.BufferSize)
// Closes reader after timeout or when done channel is closed
go func() {
var closeTimeout <-chan time.Time
if h.config.CloseTimeout > 0 {
closeTimeout = time.After(h.config.CloseTimeout)
}

select {
case <-h.done:
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", h.state.Source)
}
h.fileReader.Close()
}()

r, err = reader.NewEncode(h.fileReader, h.encoding, h.config.BufferSize)
if err != nil {
return nil, err
}
Expand Down
13 changes: 11 additions & 2 deletions filebeat/harvester/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package harvester
import (
"io"
"os"
"sync"
"time"

"github.com/elastic/beats/filebeat/harvester/source"
Expand All @@ -17,12 +18,12 @@ type LogFile struct {
lastTimeRead time.Time
backoff time.Duration
done chan struct{}
singleClose sync.Once
}

func NewLogFile(
fs source.FileSource,
config harvesterConfig,
done chan struct{},
) (*LogFile, error) {
var offset int64
if seeker, ok := fs.(io.Seeker); ok {
Expand All @@ -39,7 +40,7 @@ func NewLogFile(
config: config,
lastTimeRead: time.Now(),
backoff: config.Backoff,
done: done,
done: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -164,3 +165,11 @@ func (r *LogFile) wait() {
}
}
}

func (r *LogFile) Close() {
// Make sure reader is only closed once
r.singleClose.Do(func() {
close(r.done)
// Note: File reader is not closed here because that leads to race conditions
})
}
1 change: 1 addition & 0 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (p *Prospector) Run() {
if p.config.CleanInactive > 0 {
event.State.TTL = p.config.CleanInactive
}

select {
case <-p.done:
logp.Info("Prospector channel stopped")
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ filebeat.prospectors:
close_removed: {{close_removed}}
close_renamed: {{close_renamed}}
close_eof: {{close_eof}}
close_timeout: {{close_timeout}}
force_close_files: {{force_close_files}}
clean_inactive: {{clean_inactive}}
clean_removed: {{clean_removed}}
Expand Down
41 changes: 41 additions & 0 deletions filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,44 @@ def test_truncated_file_closed(self):
max_timeout=15)

filebeat.check_kill_and_wait()

def test_close_timeout(self):
"""
Checks that a file is closed after close_timeout
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
close_timeout="1s",
scan_frequency="1s"
)
os.mkdir(self.working_dir + "/log/")

filebeat = self.start_beat()

testfile1 = self.working_dir + "/log/test.log"
file = open(testfile1, 'w')

# Write 1000 lines with a sleep between each line to make sure it takes more then 1s to complete
iterations1 = 1000
for n in range(0, iterations1):
file.write("example data")
file.write("\n")
time.sleep(0.001)

file.close()

# Wait until harvester is closed because of ttl
self.wait_until(
lambda: self.log_contains(
"Closing harvester because close_timeout was reached"),
max_timeout=15)

filebeat.check_kill_and_wait()

data = self.get_registry()
assert len(data) == 1

# Check that not all but some lines were read
assert self.output_lines() < 1000
assert self.output_lines() > 0

59 changes: 56 additions & 3 deletions filebeat/tests/system/test_multiline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from filebeat import BaseTest
import os
import time

"""
Tests for the multiline log messages
Expand Down Expand Up @@ -115,9 +116,6 @@ def test_rabbitmq_multiline_log(self):
# Check that output file has the same number of lines as the log file
assert 3 == len(output)




def test_max_lines(self):
"""
Test the maximum number of lines that is sent by multiline
Expand Down Expand Up @@ -238,3 +236,58 @@ def test_max_bytes(self):

# Check that output file has the same number of lines as the log file
assert 20 == len(output)

def test_close_timeout_with_multiline(self):
"""
Test if multiline events are split up with close_timeout
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
pattern="^\[",
negate="true",
match="after",
close_timeout="1s",
)

os.mkdir(self.working_dir + "/log/")

testfile = self.working_dir + "/log/test.log"

with open(testfile, 'w', 0) as file:
file.write("[2015] hello world")
file.write("\n")
file.write(" First Line\n")
file.write(" Second Line\n")

proc = self.start_beat()

# Wait until harvester is closed because of timeout
# This leads to the partial event above to be sent
self.wait_until(
lambda: self.log_contains(
"Closing harvester because close_timeout was reached"),
max_timeout=15)

# Because of the timeout the following two lines should be put together
with open(testfile, 'a', 0) as file:
file.write(" This should not be third\n")
file.write(" This should not be fourth\n")
# This starts a new pattern
file.write("[2016] Hello world\n")
# This line should be appended
file.write(" First line again\n")

self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)
proc.check_kill_and_wait()

# close_timeout must have closed the reader exactly twice
self.wait_until(
lambda: self.log_contains_count(
"Closing harvester because close_timeout was reached") == 2,
max_timeout=15)

output = self.read_output()
assert 3 == len(output)

0 comments on commit 001b29b

Please sign in to comment.