Skip to content

Commit

Permalink
Merge pull request #1896 from ruflin/harvester-cleanup
Browse files Browse the repository at this point in the history
Cleanup Harvester and Input parts of Filebeat
  • Loading branch information
Steffen Siering authored Jun 24, 2016
2 parents 2d95eb8 + 6ce744e commit 08960d1
Show file tree
Hide file tree
Showing 40 changed files with 668 additions and 638 deletions.
18 changes: 4 additions & 14 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,10 @@ import (

// Defaults for config variables which are not set
const (
DefaultRegistryFile = "registry"
DefaultCloseOlder time.Duration = 1 * time.Hour
DefaultSpoolSize uint64 = 2048
DefaultIdleTimeout time.Duration = 5 * time.Second
DefaultHarvesterBufferSize int = 16 << 10 // 16384
DefaultInputType = "log"
DefaultDocumentType = "log"
DefaultTailFiles = false
DefaultBackoff = 1 * time.Second
DefaultBackoffFactor = 2
DefaultMaxBackoff = 10 * time.Second
DefaultForceCloseFiles = false
DefaultMaxBytes = 10 * (1 << 20) // 10MB
DefaultRegistryFile string = "registry"
DefaultSpoolSize uint64 = 2048
DefaultIdleTimeout time.Duration = 5 * time.Second
DefaultInputType = "log"
)

type Config struct {
Expand Down Expand Up @@ -122,7 +113,6 @@ func (config *Config) FetchConfigs() {
}

err = mergeConfigFiles(configFiles, config)

if err != nil {
log.Fatal("Error merging config files: ", err)
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/spooler"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -42,7 +42,7 @@ func New(spooler *spooler.Spooler, prospectorConfigs []*common.Config) (*Crawler
}, nil
}

func (c *Crawler) Start(states input.States) error {
func (c *Crawler) Start(states file.States) error {

logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

Expand Down
52 changes: 26 additions & 26 deletions filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,42 @@ import (
"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/harvester/processor"
"github.com/elastic/beats/libbeat/common"
)

var (
defaultConfig = harvesterConfig{
BufferSize: cfg.DefaultHarvesterBufferSize,
DocumentType: cfg.DefaultDocumentType,
BufferSize: 16 << 10, // 16384
DocumentType: "log",
InputType: cfg.DefaultInputType,
TailFiles: cfg.DefaultTailFiles,
Backoff: cfg.DefaultBackoff,
BackoffFactor: cfg.DefaultBackoffFactor,
MaxBackoff: cfg.DefaultMaxBackoff,
CloseOlder: cfg.DefaultCloseOlder,
ForceCloseFiles: cfg.DefaultForceCloseFiles,
MaxBytes: cfg.DefaultMaxBytes,
TailFiles: false,
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
CloseOlder: 1 * time.Hour,
ForceCloseFiles: false,
MaxBytes: 10 * (1 << 20), // 10MB
}
)

type harvesterConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
BufferSize int `config:"harvester_buffer_size"`
DocumentType string `config:"document_type"`
Encoding string `config:"encoding"`
InputType string `config:"input_type"`
TailFiles bool `config:"tail_files"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
CloseOlder time.Duration `config:"close_older"`
ForceCloseFiles bool `config:"force_close_files"`
ExcludeLines []*regexp.Regexp `config:"exclude_lines"`
IncludeLines []*regexp.Regexp `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *input.MultilineConfig `config:"multiline"`
JSON *input.JSONConfig `config:"json"`
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
BufferSize int `config:"harvester_buffer_size"`
DocumentType string `config:"document_type"`
Encoding string `config:"encoding"`
InputType string `config:"input_type"`
TailFiles bool `config:"tail_files"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
CloseOlder time.Duration `config:"close_older"`
ForceCloseFiles bool `config:"force_close_files"`
ExcludeLines []*regexp.Regexp `config:"exclude_lines"`
IncludeLines []*regexp.Regexp `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *processor.MultilineConfig `config:"multiline"`
JSON *processor.JSONConfig `config:"json"`
}

func (config *harvesterConfig) Validate() error {
Expand Down
4 changes: 0 additions & 4 deletions filebeat/harvester/file.go

This file was deleted.

8 changes: 5 additions & 3 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,21 @@ import (

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
)

type Harvester struct {
Path string /* the file path to harvest */
Config harvesterConfig
offset int64
State input.FileState
State file.State
stateMutex sync.Mutex
SpoolerChan chan *input.FileEvent
encoding encoding.EncodingFactory
file FileSource /* the file being watched */
file source.FileSource /* the file being watched */
ExcludeLinesRegexp []*regexp.Regexp
IncludeLinesRegexp []*regexp.Regexp
done chan struct{}
Expand All @@ -41,7 +43,7 @@ type Harvester struct {
func NewHarvester(
cfg *common.Config,
path string,
state input.FileState,
state file.State,
spooler chan *input.FileEvent,
offset int64,
done chan struct{},
Expand Down
1 change: 0 additions & 1 deletion filebeat/harvester/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ func TestExampleTest(t *testing.T) {
}

assert.Equal(t, "/var/log/", h.Path)

}
46 changes: 0 additions & 46 deletions filebeat/harvester/linereader.go

This file was deleted.

87 changes: 64 additions & 23 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ import (

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/processor"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)

// Log harvester reads files line by line and sends events to the defined output
func (h *Harvester) Harvest() {

// Makes sure file is properly closed when the harvester is stopped
defer h.close()

h.State.Finished = false
Expand All @@ -30,15 +36,15 @@ func (h *Harvester) Harvest() {
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
cfg := h.Config
readerConfig := logFileReaderConfig{
forceClose: cfg.ForceCloseFiles,
closeOlder: cfg.CloseOlder,
backoffDuration: cfg.Backoff,
maxBackoffDuration: cfg.MaxBackoff,
backoffFactor: cfg.BackoffFactor,
readerConfig := reader.LogFileReaderConfig{
ForceClose: cfg.ForceCloseFiles,
CloseOlder: cfg.CloseOlder,
BackoffDuration: cfg.Backoff,
MaxBackoffDuration: cfg.MaxBackoff,
BackoffFactor: cfg.BackoffFactor,
}

reader, err := createLineReader(
processor, err := createLineProcessor(
h.file, enc, cfg.BufferSize, cfg.MaxBytes, readerConfig,
cfg.JSON, cfg.Multiline, h.done)
if err != nil {
Expand All @@ -60,10 +66,10 @@ func (h *Harvester) Harvest() {
}

// Partial lines return error and are only read on completion
ts, text, bytesRead, jsonFields, err := readLine(reader)
ts, text, bytesRead, jsonFields, err := readLine(processor)
if err != nil {
if err == errFileTruncate {
logp.Warn("File was truncated. Begin reading file from offset 0: %s", h.Path)
if err == reader.ErrFileTruncate {
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
h.SetOffset(0)
return
}
Expand Down Expand Up @@ -151,24 +157,22 @@ func (h *Harvester) shouldExportLine(line string) bool {
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (h *Harvester) openFile() (encoding.Encoding, error) {
var file *os.File
var err error
var encoding encoding.Encoding

file, err = input.ReadOpen(h.Path)
f, err := file.ReadOpen(h.Path)
if err == nil {
// Check we are not following a rabbit hole (symlinks, etc.)
if !input.IsRegularFile(file) {
if !file.IsRegularFile(f) {
return nil, errors.New("Given file is not a regular file.")
}

encoding, err = h.encoding(file)
encoding, err = h.encoding(f)
if err != nil {

if err == transform.ErrShortSrc {
logp.Info("Initialising encoding for '%v' failed due to file being too short", file)
logp.Info("Initialising encoding for '%v' failed due to file being too short", f)
} else {
logp.Err("Initialising encoding for '%v' failed: %v", file, err)
logp.Err("Initialising encoding for '%v' failed: %v", f, err)
}
return nil, err
}
Expand All @@ -179,13 +183,13 @@ func (h *Harvester) openFile() (encoding.Encoding, error) {
}

// update file offset
err = h.initFileOffset(file)
err = h.initFileOffset(f)
if err != nil {
return nil, err
}

// yay, open file
h.file = fileSource{file}
h.file = source.File{f}
return encoding, nil
}

Expand Down Expand Up @@ -234,7 +238,7 @@ func (h *Harvester) SendStateUpdate() bool {
return h.sendEvent(h.createEvent())
}

func (h *Harvester) GetState() input.FileState {
func (h *Harvester) GetState() file.State {
h.stateMutex.Lock()
defer h.stateMutex.Unlock()

Expand All @@ -244,10 +248,9 @@ func (h *Harvester) GetState() input.FileState {

// refreshState refreshes the values in State with the values from the harvester itself
func (h *Harvester) refreshState() {

h.State.Source = h.Path
h.State.Offset = h.getOffset()
h.State.FileStateOS = input.GetOSFileState(h.State.Fileinfo)
h.State.FileStateOS = file.GetOSState(h.State.Fileinfo)
}

func (h *Harvester) close() {
Expand All @@ -265,6 +268,44 @@ func (h *Harvester) close() {
h.file.Close()
logp.Debug("harvester", "Stopping harvester, closing file: %s", h.Path)
} else {
logp.Debug("harvester", "Stopping harvester, NOT closing file as file info not available: %s", h.Path)
logp.Warn("harvester", "Stopping harvester, NOT closing file as file info not available: %s", h.Path)
}
}

func createLineProcessor(
in source.FileSource,
codec encoding.Encoding,
bufferSize int,
maxBytes int,
readerConfig reader.LogFileReaderConfig,
jsonConfig *processor.JSONConfig,
mlrConfig *processor.MultilineConfig,
done chan struct{},
) (processor.LineProcessor, error) {
var p processor.LineProcessor
var err error

fileReader, err := reader.NewLogFileReader(in, readerConfig, done)
if err != nil {
return nil, err
}

p, err = processor.NewLineEncoder(fileReader, codec, bufferSize)
if err != nil {
return nil, err
}

if jsonConfig != nil {
p = processor.NewJSONProcessor(p, jsonConfig)
}

p = processor.NewStripNewline(p)
if mlrConfig != nil {
p, err = processor.NewMultiline(p, "\n", maxBytes, mlrConfig)
if err != nil {
return nil, err
}
}

return processor.NewLimitProcessor(p, maxBytes), nil
}
Loading

0 comments on commit 08960d1

Please sign in to comment.