diff --git a/filebeat/config/config.go b/filebeat/config/config.go index f70e63daada2..c5cc8271bbdd 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -14,19 +14,10 @@ import ( // Defaults for config variables which are not set const ( - DefaultRegistryFile = "registry" - DefaultCloseOlder time.Duration = 1 * time.Hour - DefaultSpoolSize uint64 = 2048 - DefaultIdleTimeout time.Duration = 5 * time.Second - DefaultHarvesterBufferSize int = 16 << 10 // 16384 - DefaultInputType = "log" - DefaultDocumentType = "log" - DefaultTailFiles = false - DefaultBackoff = 1 * time.Second - DefaultBackoffFactor = 2 - DefaultMaxBackoff = 10 * time.Second - DefaultForceCloseFiles = false - DefaultMaxBytes = 10 * (1 << 20) // 10MB + DefaultRegistryFile string = "registry" + DefaultSpoolSize uint64 = 2048 + DefaultIdleTimeout time.Duration = 5 * time.Second + DefaultInputType = "log" ) type Config struct { @@ -122,7 +113,6 @@ func (config *Config) FetchConfigs() { } err = mergeConfigFiles(configFiles, config) - if err != nil { log.Fatal("Error merging config files: ", err) } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 36c8602c6894..01e7906c38d6 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -4,7 +4,7 @@ import ( "fmt" "sync" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/spooler" "github.com/elastic/beats/libbeat/common" @@ -42,7 +42,7 @@ func New(spooler *spooler.Spooler, prospectorConfigs []*common.Config) (*Crawler }, nil } -func (c *Crawler) Start(states input.States) error { +func (c *Crawler) Start(states file.States) error { logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs)) diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index 9ca139af07a6..c4c7e257eb35 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -6,42 +6,42 @@ import ( "time" cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/harvester/processor" "github.com/elastic/beats/libbeat/common" ) var ( defaultConfig = harvesterConfig{ - BufferSize: cfg.DefaultHarvesterBufferSize, - DocumentType: cfg.DefaultDocumentType, + BufferSize: 16 << 10, // 16384 + DocumentType: "log", InputType: cfg.DefaultInputType, - TailFiles: cfg.DefaultTailFiles, - Backoff: cfg.DefaultBackoff, - BackoffFactor: cfg.DefaultBackoffFactor, - MaxBackoff: cfg.DefaultMaxBackoff, - CloseOlder: cfg.DefaultCloseOlder, - ForceCloseFiles: cfg.DefaultForceCloseFiles, - MaxBytes: cfg.DefaultMaxBytes, + TailFiles: false, + Backoff: 1 * time.Second, + BackoffFactor: 2, + MaxBackoff: 10 * time.Second, + CloseOlder: 1 * time.Hour, + ForceCloseFiles: false, + MaxBytes: 10 * (1 << 20), // 10MB } ) type harvesterConfig struct { - common.EventMetadata `config:",inline"` // Fields and tags to add to events. - BufferSize int `config:"harvester_buffer_size"` - DocumentType string `config:"document_type"` - Encoding string `config:"encoding"` - InputType string `config:"input_type"` - TailFiles bool `config:"tail_files"` - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` - BackoffFactor int `config:"backoff_factor" validate:"min=1"` - MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` - CloseOlder time.Duration `config:"close_older"` - ForceCloseFiles bool `config:"force_close_files"` - ExcludeLines []*regexp.Regexp `config:"exclude_lines"` - IncludeLines []*regexp.Regexp `config:"include_lines"` - MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` - Multiline *input.MultilineConfig `config:"multiline"` - JSON *input.JSONConfig `config:"json"` + common.EventMetadata `config:",inline"` // Fields and tags to add to events. + BufferSize int `config:"harvester_buffer_size"` + DocumentType string `config:"document_type"` + Encoding string `config:"encoding"` + InputType string `config:"input_type"` + TailFiles bool `config:"tail_files"` + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` + BackoffFactor int `config:"backoff_factor" validate:"min=1"` + MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` + CloseOlder time.Duration `config:"close_older"` + ForceCloseFiles bool `config:"force_close_files"` + ExcludeLines []*regexp.Regexp `config:"exclude_lines"` + IncludeLines []*regexp.Regexp `config:"include_lines"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *processor.MultilineConfig `config:"multiline"` + JSON *processor.JSONConfig `config:"json"` } func (config *harvesterConfig) Validate() error { diff --git a/filebeat/harvester/file.go b/filebeat/harvester/file.go deleted file mode 100644 index d3ed618a6822..000000000000 --- a/filebeat/harvester/file.go +++ /dev/null @@ -1,4 +0,0 @@ -package harvester - -// File harvester crawls full file and sends them to tika for "indexing". -// This is a potential substitute for fsriver diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index a7f5308acffe..40d8ff2ec47c 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -20,7 +20,9 @@ import ( "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" + "github.com/elastic/beats/filebeat/harvester/source" "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" ) @@ -28,11 +30,11 @@ type Harvester struct { Path string /* the file path to harvest */ Config harvesterConfig offset int64 - State input.FileState + State file.State stateMutex sync.Mutex SpoolerChan chan *input.FileEvent encoding encoding.EncodingFactory - file FileSource /* the file being watched */ + file source.FileSource /* the file being watched */ ExcludeLinesRegexp []*regexp.Regexp IncludeLinesRegexp []*regexp.Regexp done chan struct{} @@ -41,7 +43,7 @@ type Harvester struct { func NewHarvester( cfg *common.Config, path string, - state input.FileState, + state file.State, spooler chan *input.FileEvent, offset int64, done chan struct{}, diff --git a/filebeat/harvester/harvester_test.go b/filebeat/harvester/harvester_test.go index 175969d9f8ca..51a7ac2582a1 100644 --- a/filebeat/harvester/harvester_test.go +++ b/filebeat/harvester/harvester_test.go @@ -19,5 +19,4 @@ func TestExampleTest(t *testing.T) { } assert.Equal(t, "/var/log/", h.Path) - } diff --git a/filebeat/harvester/linereader.go b/filebeat/harvester/linereader.go deleted file mode 100644 index 8d4481ba762c..000000000000 --- a/filebeat/harvester/linereader.go +++ /dev/null @@ -1,46 +0,0 @@ -package harvester - -import ( - "golang.org/x/text/encoding" - - "github.com/elastic/beats/filebeat/harvester/processor" - "github.com/elastic/beats/filebeat/input" -) - -func createLineReader( - in FileSource, - codec encoding.Encoding, - bufferSize int, - maxBytes int, - readerConfig logFileReaderConfig, - jsonConfig *input.JSONConfig, - mlrConfig *input.MultilineConfig, - done chan struct{}, -) (processor.LineProcessor, error) { - var p processor.LineProcessor - var err error - - fileReader, err := newLogFileReader(in, readerConfig, done) - if err != nil { - return nil, err - } - - p, err = processor.NewLineSource(fileReader, codec, bufferSize) - if err != nil { - return nil, err - } - - if jsonConfig != nil { - p = processor.NewJSONProcessor(p, jsonConfig) - } - - p = processor.NewStripNewline(p) - if mlrConfig != nil { - p, err = processor.NewMultiline(p, "\n", maxBytes, mlrConfig) - if err != nil { - return nil, err - } - } - - return processor.NewLimitProcessor(p, maxBytes), nil -} diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 905ac78f6af0..18733daf8ad8 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -8,12 +8,18 @@ import ( "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" + "github.com/elastic/beats/filebeat/harvester/processor" + "github.com/elastic/beats/filebeat/harvester/reader" + "github.com/elastic/beats/filebeat/harvester/source" "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" ) // Log harvester reads files line by line and sends events to the defined output func (h *Harvester) Harvest() { + + // Makes sure file is properly closed when the harvester is stopped defer h.close() h.State.Finished = false @@ -30,15 +36,15 @@ func (h *Harvester) Harvest() { // for new lines in input stream. Simple 8-bit based encodings, or plain // don't require 'complicated' logic. cfg := h.Config - readerConfig := logFileReaderConfig{ - forceClose: cfg.ForceCloseFiles, - closeOlder: cfg.CloseOlder, - backoffDuration: cfg.Backoff, - maxBackoffDuration: cfg.MaxBackoff, - backoffFactor: cfg.BackoffFactor, + readerConfig := reader.LogFileReaderConfig{ + ForceClose: cfg.ForceCloseFiles, + CloseOlder: cfg.CloseOlder, + BackoffDuration: cfg.Backoff, + MaxBackoffDuration: cfg.MaxBackoff, + BackoffFactor: cfg.BackoffFactor, } - reader, err := createLineReader( + processor, err := createLineProcessor( h.file, enc, cfg.BufferSize, cfg.MaxBytes, readerConfig, cfg.JSON, cfg.Multiline, h.done) if err != nil { @@ -60,10 +66,10 @@ func (h *Harvester) Harvest() { } // Partial lines return error and are only read on completion - ts, text, bytesRead, jsonFields, err := readLine(reader) + ts, text, bytesRead, jsonFields, err := readLine(processor) if err != nil { - if err == errFileTruncate { - logp.Warn("File was truncated. Begin reading file from offset 0: %s", h.Path) + if err == reader.ErrFileTruncate { + logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path) h.SetOffset(0) return } @@ -151,24 +157,22 @@ func (h *Harvester) shouldExportLine(line string) bool { // is returned and the harvester is closed. The file will be picked up again the next time // the file system is scanned func (h *Harvester) openFile() (encoding.Encoding, error) { - var file *os.File - var err error var encoding encoding.Encoding - file, err = input.ReadOpen(h.Path) + f, err := file.ReadOpen(h.Path) if err == nil { // Check we are not following a rabbit hole (symlinks, etc.) - if !input.IsRegularFile(file) { + if !file.IsRegularFile(f) { return nil, errors.New("Given file is not a regular file.") } - encoding, err = h.encoding(file) + encoding, err = h.encoding(f) if err != nil { if err == transform.ErrShortSrc { - logp.Info("Initialising encoding for '%v' failed due to file being too short", file) + logp.Info("Initialising encoding for '%v' failed due to file being too short", f) } else { - logp.Err("Initialising encoding for '%v' failed: %v", file, err) + logp.Err("Initialising encoding for '%v' failed: %v", f, err) } return nil, err } @@ -179,13 +183,13 @@ func (h *Harvester) openFile() (encoding.Encoding, error) { } // update file offset - err = h.initFileOffset(file) + err = h.initFileOffset(f) if err != nil { return nil, err } // yay, open file - h.file = fileSource{file} + h.file = source.File{f} return encoding, nil } @@ -234,7 +238,7 @@ func (h *Harvester) SendStateUpdate() bool { return h.sendEvent(h.createEvent()) } -func (h *Harvester) GetState() input.FileState { +func (h *Harvester) GetState() file.State { h.stateMutex.Lock() defer h.stateMutex.Unlock() @@ -244,10 +248,9 @@ func (h *Harvester) GetState() input.FileState { // refreshState refreshes the values in State with the values from the harvester itself func (h *Harvester) refreshState() { - h.State.Source = h.Path h.State.Offset = h.getOffset() - h.State.FileStateOS = input.GetOSFileState(h.State.Fileinfo) + h.State.FileStateOS = file.GetOSState(h.State.Fileinfo) } func (h *Harvester) close() { @@ -265,6 +268,44 @@ func (h *Harvester) close() { h.file.Close() logp.Debug("harvester", "Stopping harvester, closing file: %s", h.Path) } else { - logp.Debug("harvester", "Stopping harvester, NOT closing file as file info not available: %s", h.Path) + logp.Warn("harvester", "Stopping harvester, NOT closing file as file info not available: %s", h.Path) + } +} + +func createLineProcessor( + in source.FileSource, + codec encoding.Encoding, + bufferSize int, + maxBytes int, + readerConfig reader.LogFileReaderConfig, + jsonConfig *processor.JSONConfig, + mlrConfig *processor.MultilineConfig, + done chan struct{}, +) (processor.LineProcessor, error) { + var p processor.LineProcessor + var err error + + fileReader, err := reader.NewLogFileReader(in, readerConfig, done) + if err != nil { + return nil, err + } + + p, err = processor.NewLineEncoder(fileReader, codec, bufferSize) + if err != nil { + return nil, err } + + if jsonConfig != nil { + p = processor.NewJSONProcessor(p, jsonConfig) + } + + p = processor.NewStripNewline(p) + if mlrConfig != nil { + p, err = processor.NewMultiline(p, "\n", maxBytes, mlrConfig) + if err != nil { + return nil, err + } + } + + return processor.NewLimitProcessor(p, maxBytes), nil } diff --git a/filebeat/harvester/log_test.go b/filebeat/harvester/log_test.go index e40a58e1312b..831c28387844 100644 --- a/filebeat/harvester/log_test.go +++ b/filebeat/harvester/log_test.go @@ -12,6 +12,8 @@ import ( "time" "github.com/elastic/beats/filebeat/harvester/encoding" + "github.com/elastic/beats/filebeat/harvester/reader" + "github.com/elastic/beats/filebeat/harvester/source" "github.com/stretchr/testify/assert" ) @@ -58,34 +60,34 @@ func TestReadLine(t *testing.T) { // Read only 10 bytes which is not the end of the file codec, _ := encoding.Plain(file) - readConfig := logFileReaderConfig{ - closeOlder: 500 * time.Millisecond, - backoffDuration: 100 * time.Millisecond, - maxBackoffDuration: 1 * time.Second, - backoffFactor: 2, + readConfig := reader.LogFileReaderConfig{ + CloseOlder: 500 * time.Millisecond, + BackoffDuration: 100 * time.Millisecond, + MaxBackoffDuration: 1 * time.Second, + BackoffFactor: 2, } - reader, _ := createLineReader(fileSource{readFile}, codec, 100, 1000, readConfig, nil, nil, nil) + r, _ := createLineProcessor(source.File{readFile}, codec, 100, 1000, readConfig, nil, nil, nil) // Read third line - _, text, bytesread, _, err := readLine(reader) + _, text, bytesread, _, err := readLine(r) fmt.Printf("received line: '%s'\n", text) assert.Nil(t, err) assert.Equal(t, text, firstLineString[0:len(firstLineString)-1]) assert.Equal(t, bytesread, len(firstLineString)) // read second line - _, text, bytesread, _, err = readLine(reader) + _, text, bytesread, _, err = readLine(r) fmt.Printf("received line: '%s'\n", text) assert.Equal(t, text, secondLineString[0:len(secondLineString)-1]) assert.Equal(t, bytesread, len(secondLineString)) assert.Nil(t, err) // Read third line, which doesn't exist - _, text, bytesread, _, err = readLine(reader) + _, text, bytesread, _, err = readLine(r) fmt.Printf("received line: '%s'\n", text) assert.Equal(t, "", text) assert.Equal(t, bytesread, 0) - assert.Equal(t, err, errInactive) + assert.Equal(t, err, reader.ErrInactive) } func TestExcludeLine(t *testing.T) { diff --git a/filebeat/harvester/processor/encoder.go b/filebeat/harvester/processor/encoder.go new file mode 100644 index 000000000000..9079b23a5a52 --- /dev/null +++ b/filebeat/harvester/processor/encoder.go @@ -0,0 +1,32 @@ +package processor + +import ( + "io" + "time" + + "github.com/elastic/beats/filebeat/harvester/encoding" + "github.com/elastic/beats/filebeat/harvester/reader" +) + +// LineEncoder produces lines by reading lines from an io.Reader +// through a decoder converting the reader it's encoding to utf-8. +type LineEncoder struct { + reader *reader.Line +} + +// NewLineEncoder creates a new LineEncoder from input reader by applying +// the given codec. +func NewLineEncoder( + in io.Reader, + codec encoding.Encoding, + bufferSize int, +) (LineEncoder, error) { + r, err := reader.NewLine(in, codec, bufferSize) + return LineEncoder{r}, err +} + +// Next reads the next line from it's initial io.Reader +func (p LineEncoder) Next() (Line, error) { + c, sz, err := p.reader.Next() + return Line{Ts: time.Now(), Content: c, Bytes: sz}, err +} diff --git a/filebeat/harvester/processor/json.go b/filebeat/harvester/processor/json.go new file mode 100644 index 000000000000..b5cbfcce1b41 --- /dev/null +++ b/filebeat/harvester/processor/json.go @@ -0,0 +1,76 @@ +package processor + +import ( + "encoding/json" + "fmt" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +const ( + JsonErrorKey = "json_error" +) + +type JSONProcessor struct { + reader LineProcessor + cfg *JSONConfig +} + +type JSONConfig struct { + MessageKey string `config:"message_key"` + KeysUnderRoot bool `config:"keys_under_root"` + OverwriteKeys bool `config:"overwrite_keys"` + AddErrorKey bool `config:"add_error_key"` +} + +// NewJSONProcessor creates a new processor that can decode JSON. +func NewJSONProcessor(in LineProcessor, cfg *JSONConfig) *JSONProcessor { + return &JSONProcessor{reader: in, cfg: cfg} +} + +// decodeJSON unmarshals the text parameter into a MapStr and +// returns the new text column if one was requested. +func (p *JSONProcessor) decodeJSON(text []byte) ([]byte, common.MapStr) { + var jsonFields common.MapStr + err := json.Unmarshal(text, &jsonFields) + if err != nil { + logp.Err("Error decoding JSON: %v", err) + if p.cfg.AddErrorKey { + jsonFields = common.MapStr{JsonErrorKey: fmt.Sprintf("Error decoding JSON: %v", err)} + } + return text, jsonFields + } + + if len(p.cfg.MessageKey) == 0 { + return []byte(""), jsonFields + } + + textValue, ok := jsonFields[p.cfg.MessageKey] + if !ok { + if p.cfg.AddErrorKey { + jsonFields[JsonErrorKey] = fmt.Sprintf("Key '%s' not found", p.cfg.MessageKey) + } + return []byte(""), jsonFields + } + + textString, ok := textValue.(string) + if !ok { + if p.cfg.AddErrorKey { + jsonFields[JsonErrorKey] = fmt.Sprintf("Value of key '%s' is not a string", p.cfg.MessageKey) + } + return []byte(""), jsonFields + } + + return []byte(textString), jsonFields +} + +// Next decodes JSON and returns the filled Line object. +func (p *JSONProcessor) Next() (Line, error) { + line, err := p.reader.Next() + if err != nil { + return line, err + } + line.Content, line.Fields = p.decodeJSON(line.Content) + return line, nil +} diff --git a/filebeat/harvester/processor/limit.go b/filebeat/harvester/processor/limit.go new file mode 100644 index 000000000000..372a17d78bfe --- /dev/null +++ b/filebeat/harvester/processor/limit.go @@ -0,0 +1,22 @@ +package processor + +// LimitProcessor sets an upper limited on line length. Lines longer +// then the max configured line length will be snapped short. +type LimitProcessor struct { + reader LineProcessor + maxBytes int +} + +// NewLimitProcessor creates a new processor limiting the line length. +func NewLimitProcessor(in LineProcessor, maxBytes int) *LimitProcessor { + return &LimitProcessor{reader: in, maxBytes: maxBytes} +} + +// Next returns the next line. +func (p *LimitProcessor) Next() (Line, error) { + line, err := p.reader.Next() + if len(line.Content) > p.maxBytes { + line.Content = line.Content[:p.maxBytes] + } + return line, err +} diff --git a/filebeat/harvester/processor/line.go b/filebeat/harvester/processor/line.go new file mode 100644 index 000000000000..28b7617ef6c8 --- /dev/null +++ b/filebeat/harvester/processor/line.go @@ -0,0 +1,24 @@ +package processor + +import ( + "time" + + "github.com/elastic/beats/libbeat/common" +) + +// Line represents a line event with timestamp, content and actual number +// of bytes read from input before decoding. +type Line struct { + Ts time.Time // timestamp the line was read + Content []byte // actual line read + Bytes int // total number of bytes read to generate the line + Fields common.MapStr // optional fields that can be added by processors +} + +// LineProcessor is the interface that wraps the basic Next method for +// getting a new line. +// Next returns the line being read or and error. EOF is returned +// if processor will not return any new lines on subsequent calls. +type LineProcessor interface { + Next() (Line, error) +} diff --git a/filebeat/harvester/processor/multiline.go b/filebeat/harvester/processor/multiline.go index d906deec15d0..16d4596ba6f8 100644 --- a/filebeat/harvester/processor/multiline.go +++ b/filebeat/harvester/processor/multiline.go @@ -6,7 +6,6 @@ import ( "regexp" "time" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/common" ) @@ -61,7 +60,7 @@ func NewMultiline( r LineProcessor, separator string, maxBytes int, - config *input.MultilineConfig, + config *MultilineConfig, ) (*MultiLine, error) { types := map[string]func(*regexp.Regexp) (matcher, error){ "before": beforeMatcher, diff --git a/filebeat/harvester/processor/multiline_config.go b/filebeat/harvester/processor/multiline_config.go new file mode 100644 index 000000000000..9e18f921cf2e --- /dev/null +++ b/filebeat/harvester/processor/multiline_config.go @@ -0,0 +1,22 @@ +package processor + +import ( + "fmt" + "regexp" + "time" +) + +type MultilineConfig struct { + Negate bool `config:"negate"` + Match string `config:"match" validate:"required"` + MaxLines *int `config:"max_lines"` + Pattern *regexp.Regexp `config:"pattern"` + Timeout *time.Duration `config:"timeout" validate:"positive"` +} + +func (c *MultilineConfig) Validate() error { + if c.Match != "after" && c.Match != "before" { + return fmt.Errorf("unknown matcher type: %s", c.Match) + } + return nil +} diff --git a/filebeat/harvester/processor/multiline_test.go b/filebeat/harvester/processor/multiline_test.go index a8f014e80849..705cf9297c4e 100644 --- a/filebeat/harvester/processor/multiline_test.go +++ b/filebeat/harvester/processor/multiline_test.go @@ -12,7 +12,6 @@ import ( "time" "github.com/elastic/beats/filebeat/harvester/encoding" - "github.com/elastic/beats/filebeat/input" "github.com/stretchr/testify/assert" ) @@ -26,7 +25,7 @@ func (p bufferSource) Continuable() bool { return false } func TestMultilineAfterOK(t *testing.T) { testMultilineOK(t, - input.MultilineConfig{ + MultilineConfig{ Pattern: regexp.MustCompile(`^[ \t] +`), // next line is indented by spaces Match: "after", }, @@ -37,7 +36,7 @@ func TestMultilineAfterOK(t *testing.T) { func TestMultilineBeforeOK(t *testing.T) { testMultilineOK(t, - input.MultilineConfig{ + MultilineConfig{ Pattern: regexp.MustCompile(`\\$`), // previous line ends with \ Match: "before", }, @@ -48,7 +47,7 @@ func TestMultilineBeforeOK(t *testing.T) { func TestMultilineAfterNegateOK(t *testing.T) { testMultilineOK(t, - input.MultilineConfig{ + MultilineConfig{ Pattern: regexp.MustCompile(`^-`), // first line starts with '-' at beginning of line Negate: true, Match: "after", @@ -60,7 +59,7 @@ func TestMultilineAfterNegateOK(t *testing.T) { func TestMultilineBeforeNegateOK(t *testing.T) { testMultilineOK(t, - input.MultilineConfig{ + MultilineConfig{ Pattern: regexp.MustCompile(`;$`), // last line ends with ';' Negate: true, Match: "before", @@ -70,7 +69,7 @@ func TestMultilineBeforeNegateOK(t *testing.T) { ) } -func testMultilineOK(t *testing.T, cfg input.MultilineConfig, expected ...string) { +func testMultilineOK(t *testing.T, cfg MultilineConfig, expected ...string) { _, buf := createLineBuffer(expected...) reader := createMultilineTestReader(t, buf, cfg) @@ -96,7 +95,7 @@ func testMultilineOK(t *testing.T, cfg input.MultilineConfig, expected ...string } } -func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg input.MultilineConfig) LineProcessor { +func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg MultilineConfig) LineProcessor { encFactory, ok := encoding.FindEncoding("plain") if !ok { t.Fatalf("unable to find 'plain' encoding") @@ -108,7 +107,7 @@ func createMultilineTestReader(t *testing.T, in *bytes.Buffer, cfg input.Multili } var reader LineProcessor - reader, err = NewLineSource(in, enc, 4096) + reader, err = NewLineEncoder(in, enc, 4096) if err != nil { t.Fatalf("Failed to initialize line reader: %v", err) } diff --git a/filebeat/harvester/processor/processor.go b/filebeat/harvester/processor/processor.go index 74af68bee328..95af6c9f731f 100644 --- a/filebeat/harvester/processor/processor.go +++ b/filebeat/harvester/processor/processor.go @@ -1,175 +1 @@ package processor - -import ( - "encoding/json" - "fmt" - "io" - "time" - - "github.com/elastic/beats/filebeat/harvester/encoding" - "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) - -const ( - jsonErrorKey = "json_error" -) - -// Line represents a line event with timestamp, content and actual number -// of bytes read from input before decoding. -type Line struct { - Ts time.Time // timestamp the line was read - Content []byte // actual line read - Bytes int // total number of bytes read to generate the line - Fields common.MapStr // optional fields that can be added by processors -} - -// LineProcessor is the interface that wraps the basic Next method for -// getting a new line. -// Next returns the line being read or and error. EOF is returned -// if processor will not return any new lines on subsequent calls. -type LineProcessor interface { - Next() (Line, error) -} - -// LineSource produces lines by reading lines from an io.Reader -// through a decoder converting the reader it's encoding to utf-8. -type LineSource struct { - reader *encoding.LineReader -} - -// StripNewline processor removes the last trailing newline characters from -// read lines. -type StripNewline struct { - reader LineProcessor -} - -// LimitProcessor sets an upper limited on line length. Lines longer -// then the max configured line length will be snapped short. -type LimitProcessor struct { - reader LineProcessor - maxBytes int -} - -type JSONProcessor struct { - reader LineProcessor - cfg *input.JSONConfig -} - -// NewLineSource creates a new LineSource from input reader by applying -// the given codec. -func NewLineSource( - in io.Reader, - codec encoding.Encoding, - bufferSize int, -) (LineSource, error) { - r, err := encoding.NewLineReader(in, codec, bufferSize) - return LineSource{r}, err -} - -// Next reads the next line from it's initial io.Reader -func (p LineSource) Next() (Line, error) { - c, sz, err := p.reader.Next() - return Line{Ts: time.Now(), Content: c, Bytes: sz}, err -} - -// NewStripNewline creates a new line reader stripping the last tailing newline. -func NewStripNewline(r LineProcessor) *StripNewline { - return &StripNewline{r} -} - -// Next returns the next line. -func (p *StripNewline) Next() (Line, error) { - line, err := p.reader.Next() - if err != nil { - return line, err - } - - L := line.Content - line.Content = L[:len(L)-lineEndingChars(L)] - return line, err -} - -// NewLimitProcessor creates a new processor limiting the line length. -func NewLimitProcessor(in LineProcessor, maxBytes int) *LimitProcessor { - return &LimitProcessor{reader: in, maxBytes: maxBytes} -} - -// Next returns the next line. -func (p *LimitProcessor) Next() (Line, error) { - line, err := p.reader.Next() - if len(line.Content) > p.maxBytes { - line.Content = line.Content[:p.maxBytes] - } - return line, err -} - -// NewJSONProcessor creates a new processor that can decode JSON. -func NewJSONProcessor(in LineProcessor, cfg *input.JSONConfig) *JSONProcessor { - return &JSONProcessor{reader: in, cfg: cfg} -} - -// decodeJSON unmarshals the text parameter into a MapStr and -// returns the new text column if one was requested. -func (p *JSONProcessor) decodeJSON(text []byte) ([]byte, common.MapStr) { - var jsonFields common.MapStr - err := json.Unmarshal(text, &jsonFields) - if err != nil { - logp.Err("Error decoding JSON: %v", err) - if p.cfg.AddErrorKey { - jsonFields = common.MapStr{jsonErrorKey: fmt.Sprintf("Error decoding JSON: %v", err)} - } - return text, jsonFields - } - - if len(p.cfg.MessageKey) == 0 { - return []byte(""), jsonFields - } - - textValue, ok := jsonFields[p.cfg.MessageKey] - if !ok { - if p.cfg.AddErrorKey { - jsonFields[jsonErrorKey] = fmt.Sprintf("Key '%s' not found", p.cfg.MessageKey) - } - return []byte(""), jsonFields - } - - textString, ok := textValue.(string) - if !ok { - if p.cfg.AddErrorKey { - jsonFields[jsonErrorKey] = fmt.Sprintf("Value of key '%s' is not a string", p.cfg.MessageKey) - } - return []byte(""), jsonFields - } - - return []byte(textString), jsonFields -} - -// Next decodes JSON and returns the filled Line object. -func (p *JSONProcessor) Next() (Line, error) { - line, err := p.reader.Next() - if err != nil { - return line, err - } - line.Content, line.Fields = p.decodeJSON(line.Content) - return line, nil -} - -// isLine checks if the given byte array is a line, means has a line ending \n -func isLine(l []byte) bool { - return l != nil && len(l) > 0 && l[len(l)-1] == '\n' -} - -// lineEndingChars returns the number of line ending chars the given by array has -// In case of Unix/Linux files, it is -1, in case of Windows mostly -2 -func lineEndingChars(l []byte) int { - if !isLine(l) { - return 0 - } - - if len(l) > 1 && l[len(l)-2] == '\r' { - return 2 - } - return 1 -} diff --git a/filebeat/harvester/processor/processor_test.go b/filebeat/harvester/processor/processor_test.go index bff2d3251d57..6cc1a251ed52 100644 --- a/filebeat/harvester/processor/processor_test.go +++ b/filebeat/harvester/processor/processor_test.go @@ -5,7 +5,6 @@ package processor import ( "testing" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/common" "github.com/stretchr/testify/assert" ) @@ -46,7 +45,7 @@ func TestLineEndingChars(t *testing.T) { func TestDecodeJSON(t *testing.T) { type io struct { Text string - Config input.JSONConfig + Config JSONConfig ExpectedText string ExpectedMap common.MapStr } @@ -54,60 +53,60 @@ func TestDecodeJSON(t *testing.T) { var tests = []io{ { Text: `{"message": "test", "value": 1}`, - Config: input.JSONConfig{MessageKey: "message"}, + Config: JSONConfig{MessageKey: "message"}, ExpectedText: "test", ExpectedMap: common.MapStr{"message": "test", "value": float64(1)}, }, { Text: `{"message": "test", "value": 1}`, - Config: input.JSONConfig{MessageKey: "message1"}, + Config: JSONConfig{MessageKey: "message1"}, ExpectedText: "", ExpectedMap: common.MapStr{"message": "test", "value": float64(1)}, }, { Text: `{"message": "test", "value": 1}`, - Config: input.JSONConfig{MessageKey: "value"}, + Config: JSONConfig{MessageKey: "value"}, ExpectedText: "", ExpectedMap: common.MapStr{"message": "test", "value": float64(1)}, }, { Text: `{"message": "test", "value": "1"}`, - Config: input.JSONConfig{MessageKey: "value"}, + Config: JSONConfig{MessageKey: "value"}, ExpectedText: "1", ExpectedMap: common.MapStr{"message": "test", "value": "1"}, }, { // in case of JSON decoding errors, the text is passed as is Text: `{"message": "test", "value": "`, - Config: input.JSONConfig{MessageKey: "value"}, + Config: JSONConfig{MessageKey: "value"}, ExpectedText: `{"message": "test", "value": "`, ExpectedMap: nil, }, { // Add key error helps debugging this Text: `{"message": "test", "value": "`, - Config: input.JSONConfig{MessageKey: "value", AddErrorKey: true}, + Config: JSONConfig{MessageKey: "value", AddErrorKey: true}, ExpectedText: `{"message": "test", "value": "`, ExpectedMap: common.MapStr{"json_error": "Error decoding JSON: unexpected end of JSON input"}, }, { // If the text key is not found, put an error Text: `{"message": "test", "value": "1"}`, - Config: input.JSONConfig{MessageKey: "hello", AddErrorKey: true}, + Config: JSONConfig{MessageKey: "hello", AddErrorKey: true}, ExpectedText: ``, ExpectedMap: common.MapStr{"message": "test", "value": "1", "json_error": "Key 'hello' not found"}, }, { // If the text key is found, but not a string, put an error Text: `{"message": "test", "value": 1}`, - Config: input.JSONConfig{MessageKey: "value", AddErrorKey: true}, + Config: JSONConfig{MessageKey: "value", AddErrorKey: true}, ExpectedText: ``, ExpectedMap: common.MapStr{"message": "test", "value": float64(1), "json_error": "Value of key 'value' is not a string"}, }, { // Without a text key, simple return the json and an empty text Text: `{"message": "test", "value": 1}`, - Config: input.JSONConfig{AddErrorKey: true}, + Config: JSONConfig{AddErrorKey: true}, ExpectedText: ``, ExpectedMap: common.MapStr{"message": "test", "value": float64(1)}, }, diff --git a/filebeat/harvester/processor/strip_newline.go b/filebeat/harvester/processor/strip_newline.go new file mode 100644 index 000000000000..7c0bae8c21dc --- /dev/null +++ b/filebeat/harvester/processor/strip_newline.go @@ -0,0 +1,42 @@ +package processor + +// StripNewline processor removes the last trailing newline characters from +// read lines. +type StripNewline struct { + reader LineProcessor +} + +// NewStripNewline creates a new line reader stripping the last tailing newline. +func NewStripNewline(r LineProcessor) *StripNewline { + return &StripNewline{r} +} + +// Next returns the next line. +func (p *StripNewline) Next() (Line, error) { + line, err := p.reader.Next() + if err != nil { + return line, err + } + + L := line.Content + line.Content = L[:len(L)-lineEndingChars(L)] + return line, err +} + +// isLine checks if the given byte array is a line, means has a line ending \n +func isLine(l []byte) bool { + return l != nil && len(l) > 0 && l[len(l)-1] == '\n' +} + +// lineEndingChars returns the number of line ending chars the given by array has +// In case of Unix/Linux files, it is -1, in case of Windows mostly -2 +func lineEndingChars(l []byte) int { + if !isLine(l) { + return 0 + } + + if len(l) > 1 && l[len(l)-2] == '\r' { + return 2 + } + return 1 +} diff --git a/filebeat/harvester/encoding/reader.go b/filebeat/harvester/reader/line.go similarity index 92% rename from filebeat/harvester/encoding/reader.go rename to filebeat/harvester/reader/line.go index 139cc582e24b..9501cf40b610 100644 --- a/filebeat/harvester/encoding/reader.go +++ b/filebeat/harvester/reader/line.go @@ -1,4 +1,4 @@ -package encoding +package reader import ( "io" @@ -12,7 +12,7 @@ import ( // lineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. -type LineReader struct { +type Line struct { rawInput io.Reader codec encoding.Encoding bufferSize int @@ -25,12 +25,12 @@ type LineReader struct { decoder transform.Transformer } -func NewLineReader( +func NewLine( input io.Reader, codec encoding.Encoding, bufferSize int, -) (*LineReader, error) { - l := &LineReader{} +) (*Line, error) { + l := &Line{} if err := l.init(input, codec, bufferSize); err != nil { return nil, err @@ -39,7 +39,7 @@ func NewLineReader( return l, nil } -func (l *LineReader) init( +func (l *Line) init( input io.Reader, codec encoding.Encoding, bufferSize int, @@ -61,7 +61,7 @@ func (l *LineReader) init( return nil } -func (l *LineReader) Next() ([]byte, int, error) { +func (l *Line) Next() ([]byte, int, error) { for { // read next 'potential' line from input buffer/reader err := l.advance() @@ -91,7 +91,7 @@ func (l *LineReader) Next() ([]byte, int, error) { return bytes, sz, nil } -func (l *LineReader) advance() error { +func (l *Line) advance() error { var idx int var err error @@ -146,7 +146,7 @@ func (l *LineReader) advance() error { return err } -func (l *LineReader) decode(end int) (int, error) { +func (l *Line) decode(end int) (int, error) { var err error buffer := make([]byte, 1024) inBytes := l.inBuffer.Bytes() diff --git a/filebeat/harvester/encoding/reader_test.go b/filebeat/harvester/reader/line_test.go similarity index 93% rename from filebeat/harvester/encoding/reader_test.go rename to filebeat/harvester/reader/line_test.go index a1382fff42be..9d79693d5293 100644 --- a/filebeat/harvester/encoding/reader_test.go +++ b/filebeat/harvester/reader/line_test.go @@ -1,12 +1,13 @@ // +build !integration -package encoding +package reader import ( "bytes" "math/rand" "testing" + "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/stretchr/testify/assert" "golang.org/x/text/transform" @@ -31,7 +32,7 @@ func TestReaderEncodings(t *testing.T) { for _, test := range tests { t.Logf("test codec: %v", test.encoding) - codecFactory, ok := FindEncoding(test.encoding) + codecFactory, ok := encoding.FindEncoding(test.encoding) if !ok { t.Errorf("can not find encoding '%v'", test.encoding) continue @@ -50,7 +51,7 @@ func TestReaderEncodings(t *testing.T) { } // create line reader - reader, err := NewLineReader(buffer, codec, 1024) + reader, err := NewLine(buffer, codec, 1024) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -140,8 +141,8 @@ func testReadLines(t *testing.T, inputLines [][]byte) { // initialize reader buffer := bytes.NewBuffer(inputStream) - codec, _ := Plain(buffer) - reader, err := NewLineReader(buffer, codec, buffer.Len()) + codec, _ := encoding.Plain(buffer) + reader, err := NewLine(buffer, codec, buffer.Len()) if err != nil { t.Fatalf("Error initializing reader: %v", err) } diff --git a/filebeat/harvester/reader.go b/filebeat/harvester/reader/log.go similarity index 69% rename from filebeat/harvester/reader.go rename to filebeat/harvester/reader/log.go index 2ae3f16093c6..fb1f5a651502 100644 --- a/filebeat/harvester/reader.go +++ b/filebeat/harvester/reader/log.go @@ -1,4 +1,4 @@ -package harvester +package reader import ( "errors" @@ -6,37 +6,37 @@ import ( "os" "time" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/harvester/source" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" ) -type logFileReader struct { - fs FileSource - offset int64 - config logFileReaderConfig +var ( + ErrFileTruncate = errors.New("detected file being truncated") + ErrForceClose = errors.New("file must be closed") + ErrInactive = errors.New("file inactive") +) +type logFileReader struct { + fs source.FileSource + offset int64 + config LogFileReaderConfig lastTimeRead time.Time backoff time.Duration done chan struct{} } -type logFileReaderConfig struct { - forceClose bool - closeOlder time.Duration - backoffDuration time.Duration - maxBackoffDuration time.Duration - backoffFactor int +type LogFileReaderConfig struct { + ForceClose bool + CloseOlder time.Duration + BackoffDuration time.Duration + MaxBackoffDuration time.Duration + BackoffFactor int } -var ( - errFileTruncate = errors.New("detected file being truncated") - errForceClose = errors.New("file must be closed") - errInactive = errors.New("file inactive") -) - -func newLogFileReader( - fs FileSource, - config logFileReaderConfig, +func NewLogFileReader( + fs source.FileSource, + config LogFileReaderConfig, done chan struct{}, ) (*logFileReader, error) { var offset int64 @@ -53,7 +53,7 @@ func newLogFileReader( offset: offset, config: config, lastTimeRead: time.Now(), - backoff: config.backoffDuration, + backoff: config.BackoffDuration, done: done, }, nil } @@ -74,7 +74,7 @@ func (r *logFileReader) Read(buf []byte) (int, error) { } if err == nil { // reset backoff - r.backoff = r.config.backoffDuration + r.backoff = r.config.BackoffDuration return n, nil } @@ -103,26 +103,26 @@ func (r *logFileReader) Read(buf []byte) (int, error) { logp.Debug("harvester", "File was truncated as offset (%s) > size (%s): %s", r.offset, info.Size(), r.fs.Name()) - return n, errFileTruncate + return n, ErrFileTruncate } age := time.Since(r.lastTimeRead) - if age > r.config.closeOlder { + if age > r.config.CloseOlder { // If the file hasn't change for longer then maxInactive, harvester stops // and file handle will be closed. - return n, errInactive + return n, ErrInactive } - if r.config.forceClose { + if r.config.ForceClose { // Check if the file name exists (see #93) _, statErr := os.Stat(r.fs.Name()) // Error means file does not exist. If no error, check if same file. If // not close as rotated. - if statErr != nil || !input.IsSameFile(r.fs.Name(), info) { + if statErr != nil || !file.IsSameFile(r.fs.Name(), info) { logp.Info("Force close file: %s; error: %s", r.fs.Name(), statErr) // Return directly on windows -> file is closing - return n, errForceClose + return n, ErrForceClose } } @@ -144,10 +144,10 @@ func (r *logFileReader) wait() { time.Sleep(r.backoff) // Increment backoff up to maxBackoff - if r.backoff < r.config.maxBackoffDuration { - r.backoff = r.backoff * time.Duration(r.config.backoffFactor) - if r.backoff > r.config.maxBackoffDuration { - r.backoff = r.config.maxBackoffDuration + if r.backoff < r.config.MaxBackoffDuration { + r.backoff = r.backoff * time.Duration(r.config.BackoffFactor) + if r.backoff > r.config.MaxBackoffDuration { + r.backoff = r.config.MaxBackoffDuration } } } diff --git a/filebeat/harvester/source/file.go b/filebeat/harvester/source/file.go new file mode 100644 index 000000000000..d55fe2cfb723 --- /dev/null +++ b/filebeat/harvester/source/file.go @@ -0,0 +1,9 @@ +package source + +import "os" + +type File struct { + *os.File +} + +func (File) Continuable() bool { return true } diff --git a/filebeat/harvester/source/pipe.go b/filebeat/harvester/source/pipe.go new file mode 100644 index 000000000000..3b81b5a54ea0 --- /dev/null +++ b/filebeat/harvester/source/pipe.go @@ -0,0 +1,15 @@ +package source + +import "os" + +// restrict file to minimal interface of FileSource to prevent possible casts +// to additional interfaces supported by underlying file +type Pipe struct { + File *os.File +} + +func (p Pipe) Read(b []byte) (int, error) { return p.File.Read(b) } +func (p Pipe) Close() error { return p.File.Close() } +func (p Pipe) Name() string { return p.File.Name() } +func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() } +func (p Pipe) Continuable() bool { return false } diff --git a/filebeat/harvester/source/source.go b/filebeat/harvester/source/source.go new file mode 100644 index 000000000000..c913908d8685 --- /dev/null +++ b/filebeat/harvester/source/source.go @@ -0,0 +1,17 @@ +package source + +import ( + "io" + "os" +) + +type LogSource interface { + io.ReadCloser + Name() string +} + +type FileSource interface { + LogSource + Stat() (os.FileInfo, error) + Continuable() bool // can we continue processing after EOF? +} diff --git a/filebeat/harvester/sources.go b/filebeat/harvester/sources.go deleted file mode 100644 index 5b873bd6a90e..000000000000 --- a/filebeat/harvester/sources.go +++ /dev/null @@ -1,31 +0,0 @@ -package harvester - -import ( - "io" - "os" -) - -type LogSource interface { - io.ReadCloser - Name() string -} - -type FileSource interface { - LogSource - Stat() (os.FileInfo, error) - Continuable() bool // can we continue processing after EOF? -} - -// restrict file to minimal interface of FileSource to prevent possible casts -// to additional interfaces supported by underlying file -type pipeSource struct{ file *os.File } - -func (p pipeSource) Read(b []byte) (int, error) { return p.file.Read(b) } -func (p pipeSource) Close() error { return p.file.Close() } -func (p pipeSource) Name() string { return p.file.Name() } -func (p pipeSource) Stat() (os.FileInfo, error) { return p.file.Stat() } -func (p pipeSource) Continuable() bool { return false } - -type fileSource struct{ *os.File } - -func (fileSource) Continuable() bool { return true } diff --git a/filebeat/harvester/stdin.go b/filebeat/harvester/stdin.go index f799a1c5d36c..fc5a36fd3cc7 100644 --- a/filebeat/harvester/stdin.go +++ b/filebeat/harvester/stdin.go @@ -3,12 +3,13 @@ package harvester import ( "os" + "github.com/elastic/beats/filebeat/harvester/source" "golang.org/x/text/encoding" ) // Stdin reads all incoming traffic from stdin and sends it directly to the output func (h *Harvester) openStdin() (encoding.Encoding, error) { - h.file = pipeSource{os.Stdin} + h.file = source.Pipe{os.Stdin} return h.encoding(h.file) } diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 1bcd312396ed..afada9f2478f 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -1,11 +1,13 @@ package input import ( - "fmt" "os" - "regexp" "time" + "fmt" + + "github.com/elastic/beats/filebeat/harvester/processor" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -22,30 +24,27 @@ type FileEvent struct { Text *string Fileinfo os.FileInfo JSONFields common.MapStr - JSONConfig *JSONConfig - FileState FileState -} - -type JSONConfig struct { - MessageKey string `config:"message_key"` - KeysUnderRoot bool `config:"keys_under_root"` - OverwriteKeys bool `config:"overwrite_keys"` - AddErrorKey bool `config:"add_error_key"` + JSONConfig *processor.JSONConfig + FileState file.State } -type MultilineConfig struct { - Negate bool `config:"negate"` - Match string `config:"match" validate:"required"` - MaxLines *int `config:"max_lines"` - Pattern *regexp.Regexp `config:"pattern"` - Timeout *time.Duration `config:"timeout" validate:"positive"` -} +func (f *FileEvent) ToMapStr() common.MapStr { + event := common.MapStr{ + common.EventMetadataKey: f.EventMetadata, + "@timestamp": common.Time(f.ReadTime), + "source": f.Source, + "offset": f.Offset, // Offset here is the offset before the starting char. + "type": f.DocumentType, + "input_type": f.InputType, + } -func (c *MultilineConfig) Validate() error { - if c.Match != "after" && c.Match != "before" { - return fmt.Errorf("unknown matcher type: %s", c.Match) + if f.JSONConfig != nil && len(f.JSONFields) > 0 { + mergeJSONFields(f, event) + } else { + event["message"] = f.Text } - return nil + + return event } // mergeJSONFields writes the JSON fields in the event map, @@ -66,7 +65,7 @@ func mergeJSONFields(f *FileEvent, event common.MapStr) { vstr, ok := v.(string) if !ok { logp.Err("JSON: Won't overwrite @timestamp because value is not string") - event[jsonErrorKey] = "@timestamp not overwritten (not string)" + event[processor.JsonErrorKey] = "@timestamp not overwritten (not string)" continue } @@ -74,7 +73,7 @@ func mergeJSONFields(f *FileEvent, event common.MapStr) { ts, err := time.Parse(time.RFC3339, vstr) if err != nil { logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err) - event[jsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) + event[processor.JsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr) continue } event[k] = common.Time(ts) @@ -82,12 +81,12 @@ func mergeJSONFields(f *FileEvent, event common.MapStr) { vstr, ok := v.(string) if !ok { logp.Err("JSON: Won't overwrite type because value is not string") - event[jsonErrorKey] = "type not overwritten (not string)" + event[processor.JsonErrorKey] = "type not overwritten (not string)" continue } if len(vstr) == 0 || vstr[0] == '_' { logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore") - event[jsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) + event[processor.JsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr) continue } event[k] = vstr @@ -102,22 +101,3 @@ func mergeJSONFields(f *FileEvent, event common.MapStr) { event["json"] = f.JSONFields } } - -func (f *FileEvent) ToMapStr() common.MapStr { - event := common.MapStr{ - common.EventMetadataKey: f.EventMetadata, - "@timestamp": common.Time(f.ReadTime), - "source": f.Source, - "offset": f.Offset, // Offset here is the offset before the starting char. - "type": f.DocumentType, - "input_type": f.InputType, - } - - if f.JSONConfig != nil && len(f.JSONFields) > 0 { - mergeJSONFields(f, event) - } else { - event["message"] = f.Text - } - - return event -} diff --git a/filebeat/input/file_test.go b/filebeat/input/event_test.go similarity index 58% rename from filebeat/input/file_test.go rename to filebeat/input/event_test.go index 7f85cd61c121..f681e6705341 100644 --- a/filebeat/input/file_test.go +++ b/filebeat/input/event_test.go @@ -1,109 +1,14 @@ -// +build !integration - package input import ( - "io/ioutil" - "os" - "path/filepath" "testing" "time" + "github.com/elastic/beats/filebeat/harvester/processor" "github.com/elastic/beats/libbeat/common" "github.com/stretchr/testify/assert" ) -func TestIsSameFile(t *testing.T) { - absPath, err := filepath.Abs("../tests/files/") - - assert.NotNil(t, absPath) - assert.Nil(t, err) - - fileInfo1, err := os.Stat(absPath + "/logs/test.log") - fileInfo2, err := os.Stat(absPath + "/logs/system.log") - - assert.Nil(t, err) - assert.NotNil(t, fileInfo1) - assert.NotNil(t, fileInfo2) - - file1 := &File{ - FileInfo: fileInfo1, - } - - file2 := &File{ - FileInfo: fileInfo2, - } - - file3 := &File{ - FileInfo: fileInfo2, - } - - assert.False(t, file1.IsSameFile(file2)) - assert.False(t, file2.IsSameFile(file1)) - - assert.True(t, file1.IsSameFile(file1)) - assert.True(t, file2.IsSameFile(file2)) - - assert.True(t, file3.IsSameFile(file2)) - assert.True(t, file2.IsSameFile(file3)) -} - -func TestSafeFileRotateExistingFile(t *testing.T) { - - tempdir, err := ioutil.TempDir("", "") - assert.NoError(t, err) - defer func() { - assert.NoError(t, os.RemoveAll(tempdir)) - }() - - // create an existing registry file - err = ioutil.WriteFile(filepath.Join(tempdir, "registry"), - []byte("existing filebeat"), 0x777) - assert.NoError(t, err) - - // create a new registry.new file - err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"), - []byte("new filebeat"), 0x777) - assert.NoError(t, err) - - // rotate registry.new into registry - err = SafeFileRotate(filepath.Join(tempdir, "registry"), - filepath.Join(tempdir, "registry.new")) - assert.NoError(t, err) - - contents, err := ioutil.ReadFile(filepath.Join(tempdir, "registry")) - assert.NoError(t, err) - assert.Equal(t, []byte("new filebeat"), contents) - - // do it again to make sure we deal with deleting the old file - - err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"), - []byte("new filebeat 1"), 0x777) - assert.NoError(t, err) - - err = SafeFileRotate(filepath.Join(tempdir, "registry"), - filepath.Join(tempdir, "registry.new")) - assert.NoError(t, err) - - contents, err = ioutil.ReadFile(filepath.Join(tempdir, "registry")) - assert.NoError(t, err) - assert.Equal(t, []byte("new filebeat 1"), contents) - - // and again for good measure - - err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"), - []byte("new filebeat 2"), 0x777) - assert.NoError(t, err) - - err = SafeFileRotate(filepath.Join(tempdir, "registry"), - filepath.Join(tempdir, "registry.new")) - assert.NoError(t, err) - - contents, err = ioutil.ReadFile(filepath.Join(tempdir, "registry")) - assert.NoError(t, err) - assert.Equal(t, []byte("new filebeat 2"), contents) -} - func TestFileEventToMapStr(t *testing.T) { // Test 'fields' is not present when it is nil. event := FileEvent{} @@ -129,7 +34,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": "test", "text": "hello"}, - JSONConfig: &JSONConfig{KeysUnderRoot: true}, + JSONConfig: &processor.JSONConfig{KeysUnderRoot: true}, }, ExpectedItems: common.MapStr{ "type": "test_type", @@ -142,7 +47,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": "test", "text": "hello"}, - JSONConfig: &JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + JSONConfig: &processor.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "type": "test", @@ -155,7 +60,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": "test", "text": "hello"}, - JSONConfig: &JSONConfig{}, + JSONConfig: &processor.JSONConfig{}, }, ExpectedItems: common.MapStr{ "json": common.MapStr{"type": "test", "text": "hello"}, @@ -168,7 +73,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": "test", "text": "hi"}, - JSONConfig: &JSONConfig{MessageKey: "text"}, + JSONConfig: &processor.JSONConfig{MessageKey: "text"}, }, ExpectedItems: common.MapStr{ "json": common.MapStr{"type": "test", "text": "hello"}, @@ -183,7 +88,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}, - JSONConfig: &JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + JSONConfig: &processor.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"), @@ -198,7 +103,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}, - JSONConfig: &JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + JSONConfig: &processor.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "@timestamp": common.Time(now), @@ -214,7 +119,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": "test", "@timestamp": 42}, - JSONConfig: &JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + JSONConfig: &processor.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "@timestamp": common.Time(now), @@ -228,7 +133,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": 42}, - JSONConfig: &JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + JSONConfig: &processor.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "type": "test_type", @@ -241,7 +146,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": ""}, - JSONConfig: &JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + JSONConfig: &processor.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "type": "test_type", @@ -254,7 +159,7 @@ func TestFileEventToMapStrJSON(t *testing.T) { DocumentType: "test_type", Text: &text, JSONFields: common.MapStr{"type": "_type"}, - JSONConfig: &JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, + JSONConfig: &processor.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true}, }, ExpectedItems: common.MapStr{ "type": "test_type", diff --git a/filebeat/input/file.go b/filebeat/input/file/file.go similarity index 88% rename from filebeat/input/file.go rename to filebeat/input/file/file.go index c5d9e75a626f..25f8ca6b2b4a 100644 --- a/filebeat/input/file.go +++ b/filebeat/input/file/file.go @@ -1,4 +1,4 @@ -package input +package file import ( "os" @@ -6,15 +6,11 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -const ( - jsonErrorKey = "json_error" -) - type File struct { - File *os.File - FileInfo os.FileInfo - Path string - FileState *FileState + File *os.File + FileInfo os.FileInfo + Path string + State *State } // Check that the file isn't a symlink, mode is regular or file is nil diff --git a/filebeat/input/file_other.go b/filebeat/input/file/file_other.go similarity index 78% rename from filebeat/input/file_other.go rename to filebeat/input/file/file_other.go index 37521f616418..4c92313240ad 100644 --- a/filebeat/input/file_other.go +++ b/filebeat/input/file/file_other.go @@ -1,6 +1,6 @@ // +build !windows -package input +package file import ( "os" @@ -9,18 +9,18 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -type FileStateOS struct { +type StateOS struct { Inode uint64 `json:"inode,"` Device uint64 `json:"device,"` } -// GetOSFileState returns the FileStateOS for non windows systems -func GetOSFileState(info os.FileInfo) FileStateOS { +// GetOSState returns the FileStateOS for non windows systems +func GetOSState(info os.FileInfo) StateOS { stat := info.Sys().(*syscall.Stat_t) // Convert inode and dev to uint64 to be cross platform compatible - fileState := FileStateOS{ + fileState := StateOS{ Inode: uint64(stat.Ino), Device: uint64(stat.Dev), } @@ -29,7 +29,7 @@ func GetOSFileState(info os.FileInfo) FileStateOS { } // IsSame file checks if the files are identical -func (fs FileStateOS) IsSame(state FileStateOS) bool { +func (fs StateOS) IsSame(state StateOS) bool { return fs.Inode == state.Inode && fs.Device == state.Device } diff --git a/filebeat/input/file_other_test.go b/filebeat/input/file/file_other_test.go similarity index 87% rename from filebeat/input/file_other_test.go rename to filebeat/input/file/file_other_test.go index 76f8098c1c50..539579295f09 100644 --- a/filebeat/input/file_other_test.go +++ b/filebeat/input/file/file_other_test.go @@ -1,6 +1,6 @@ // +build !windows,!integration -package input +package file import ( "io/ioutil" @@ -17,7 +17,7 @@ func TestGetOSFileState(t *testing.T) { fileinfo, err := file.Stat() assert.Nil(t, err) - state := GetOSFileState(fileinfo) + state := GetOSState(fileinfo) assert.True(t, state.Inode > 0) assert.True(t, state.Device > 0) @@ -30,7 +30,7 @@ func TestGetOSFileStateStat(t *testing.T) { fileinfo, err := os.Stat(file.Name()) assert.Nil(t, err) - state := GetOSFileState(fileinfo) + state := GetOSState(fileinfo) assert.True(t, state.Inode > 0) assert.True(t, state.Device > 0) diff --git a/filebeat/input/file/file_test.go b/filebeat/input/file/file_test.go new file mode 100644 index 000000000000..289c6fbe56a8 --- /dev/null +++ b/filebeat/input/file/file_test.go @@ -0,0 +1,103 @@ +// +build !integration + +package file + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsSameFile(t *testing.T) { + absPath, err := filepath.Abs("../../tests/files/") + + assert.NotNil(t, absPath) + assert.Nil(t, err) + + fileInfo1, err := os.Stat(absPath + "/logs/test.log") + fileInfo2, err := os.Stat(absPath + "/logs/system.log") + + assert.Nil(t, err) + assert.NotNil(t, fileInfo1) + assert.NotNil(t, fileInfo2) + + file1 := &File{ + FileInfo: fileInfo1, + } + + file2 := &File{ + FileInfo: fileInfo2, + } + + file3 := &File{ + FileInfo: fileInfo2, + } + + assert.False(t, file1.IsSameFile(file2)) + assert.False(t, file2.IsSameFile(file1)) + + assert.True(t, file1.IsSameFile(file1)) + assert.True(t, file2.IsSameFile(file2)) + + assert.True(t, file3.IsSameFile(file2)) + assert.True(t, file2.IsSameFile(file3)) +} + +func TestSafeFileRotateExistingFile(t *testing.T) { + + tempdir, err := ioutil.TempDir("", "") + assert.NoError(t, err) + defer func() { + assert.NoError(t, os.RemoveAll(tempdir)) + }() + + // create an existing registry file + err = ioutil.WriteFile(filepath.Join(tempdir, "registry"), + []byte("existing filebeat"), 0x777) + assert.NoError(t, err) + + // create a new registry.new file + err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"), + []byte("new filebeat"), 0x777) + assert.NoError(t, err) + + // rotate registry.new into registry + err = SafeFileRotate(filepath.Join(tempdir, "registry"), + filepath.Join(tempdir, "registry.new")) + assert.NoError(t, err) + + contents, err := ioutil.ReadFile(filepath.Join(tempdir, "registry")) + assert.NoError(t, err) + assert.Equal(t, []byte("new filebeat"), contents) + + // do it again to make sure we deal with deleting the old file + + err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"), + []byte("new filebeat 1"), 0x777) + assert.NoError(t, err) + + err = SafeFileRotate(filepath.Join(tempdir, "registry"), + filepath.Join(tempdir, "registry.new")) + assert.NoError(t, err) + + contents, err = ioutil.ReadFile(filepath.Join(tempdir, "registry")) + assert.NoError(t, err) + assert.Equal(t, []byte("new filebeat 1"), contents) + + // and again for good measure + + err = ioutil.WriteFile(filepath.Join(tempdir, "registry.new"), + []byte("new filebeat 2"), 0x777) + assert.NoError(t, err) + + err = SafeFileRotate(filepath.Join(tempdir, "registry"), + filepath.Join(tempdir, "registry.new")) + assert.NoError(t, err) + + contents, err = ioutil.ReadFile(filepath.Join(tempdir, "registry")) + assert.NoError(t, err) + assert.Equal(t, []byte("new filebeat 2"), contents) +} diff --git a/filebeat/input/file_windows.go b/filebeat/input/file/file_windows.go similarity index 93% rename from filebeat/input/file_windows.go rename to filebeat/input/file/file_windows.go index 0fe3a9dd44bd..b6094a7648e5 100644 --- a/filebeat/input/file_windows.go +++ b/filebeat/input/file/file_windows.go @@ -1,4 +1,4 @@ -package input +package file import ( "fmt" @@ -9,14 +9,14 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -type FileStateOS struct { +type StateOS struct { IdxHi uint64 `json:"idxhi,"` IdxLo uint64 `json:"idxlo,"` Vol uint64 `json:"vol,"` } -// GetOSFileState returns the platform specific FileStateOS -func GetOSFileState(info os.FileInfo) FileStateOS { +// GetOSState returns the platform specific StateOS +func GetOSState(info os.FileInfo) StateOS { // os.SameFile must be called to populate the id fields. Otherwise in case for example // os.Stat(file) is used to get the fileInfo, the ids are empty. @@ -31,7 +31,7 @@ func GetOSFileState(info os.FileInfo) FileStateOS { // More details can be found here: https://msdn.microsoft.com/en-us/library/aa363788(v=vs.85).aspx // Uint should already return uint64, but making sure this is the case // The required fiels can be found here: https://github.com/golang/go/blob/master/src/os/types_windows.go#L78 - fileState := FileStateOS{ + fileState := StateOS{ IdxHi: uint64(fileStat.FieldByName("idxhi").Uint()), IdxLo: uint64(fileStat.FieldByName("idxlo").Uint()), Vol: uint64(fileStat.FieldByName("vol").Uint()), @@ -41,7 +41,7 @@ func GetOSFileState(info os.FileInfo) FileStateOS { } // IsSame file checks if the files are identical -func (fs FileStateOS) IsSame(state FileStateOS) bool { +func (fs StateOS) IsSame(state StateOS) bool { return fs.IdxHi == state.IdxHi && fs.IdxLo == state.IdxLo && fs.Vol == state.Vol } diff --git a/filebeat/input/file_windows_test.go b/filebeat/input/file/file_windows_test.go similarity index 76% rename from filebeat/input/file_windows_test.go rename to filebeat/input/file/file_windows_test.go index 93de9215298a..d77643a21b7c 100644 --- a/filebeat/input/file_windows_test.go +++ b/filebeat/input/file/file_windows_test.go @@ -1,6 +1,6 @@ // +build !integration -package input +package file import ( "io/ioutil" @@ -10,28 +10,28 @@ import ( "github.com/stretchr/testify/assert" ) -func TestGetOSFileState(t *testing.T) { +func TestGetOSState(t *testing.T) { file, err := ioutil.TempFile("", "") assert.Nil(t, err) fileinfo, err := file.Stat() assert.Nil(t, err) - state := GetOSFileState(fileinfo) + state := GetOSState(fileinfo) assert.True(t, state.IdxHi > 0) assert.True(t, state.IdxLo > 0) assert.True(t, state.Vol > 0) } -func TestGetOSFileStateStat(t *testing.T) { +func TestGetOSStateStat(t *testing.T) { file, err := ioutil.TempFile("", "") assert.Nil(t, err) fileinfo, err := os.Stat(file.Name()) assert.Nil(t, err) - state := GetOSFileState(fileinfo) + state := GetOSState(fileinfo) assert.True(t, state.IdxHi > 0) assert.True(t, state.IdxLo > 0) diff --git a/filebeat/input/state.go b/filebeat/input/file/state.go similarity index 79% rename from filebeat/input/state.go rename to filebeat/input/file/state.go index 7a64d0b6205e..2c1ba908bc45 100644 --- a/filebeat/input/state.go +++ b/filebeat/input/file/state.go @@ -1,4 +1,4 @@ -package input +package file import ( "os" @@ -8,41 +8,41 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -// FileState is used to communicate the reading state of a file -type FileState struct { +// State is used to communicate the reading state of a file +type State struct { Source string `json:"source"` Offset int64 `json:"offset"` Finished bool `json:"-"` // harvester state Fileinfo os.FileInfo `json:"-"` // the file info - FileStateOS FileStateOS + FileStateOS StateOS LastSeen time.Time `json:"last_seen"` } -// NewFileState creates a new file state -func NewFileState(fileInfo os.FileInfo, path string) FileState { - return FileState{ +// NewState creates a new file state +func NewState(fileInfo os.FileInfo, path string) State { + return State{ Fileinfo: fileInfo, Source: path, Finished: false, - FileStateOS: GetOSFileState(fileInfo), + FileStateOS: GetOSState(fileInfo), LastSeen: time.Now(), } } // States handles list of FileState type States struct { - states []FileState + states []State mutex sync.Mutex } func NewStates() *States { return &States{ - states: []FileState{}, + states: []State{}, } } // Update updates a state. If previous state didn't exist, new one is created -func (s *States) Update(newState FileState) { +func (s *States) Update(newState State) { s.mutex.Lock() defer s.mutex.Unlock() @@ -58,7 +58,7 @@ func (s *States) Update(newState FileState) { } } -func (s *States) FindPrevious(newState FileState) (int, FileState) { +func (s *States) FindPrevious(newState State) (int, State) { // TODO: This currently blocks writing updates every time state is fetched. Should be improved for performance s.mutex.Lock() defer s.mutex.Unlock() @@ -67,7 +67,7 @@ func (s *States) FindPrevious(newState FileState) (int, FileState) { // findPreviousState returns the previous state fo the file // In case no previous state exists, index -1 is returned -func (s *States) findPrevious(newState FileState) (int, FileState) { +func (s *States) findPrevious(newState State) (int, State) { // TODO: This could be made potentially more performance by using an index (harvester id) and only use iteration as fall back for index, oldState := range s.states { @@ -77,7 +77,7 @@ func (s *States) findPrevious(newState FileState) (int, FileState) { } } - return -1, FileState{} + return -1, State{} } // Cleanup cleans up the state array. All states which are older then `older` are removed @@ -104,18 +104,18 @@ func (s *States) Count() int { } // Returns a copy of the file states -func (s *States) GetStates() []FileState { +func (s *States) GetStates() []State { s.mutex.Lock() defer s.mutex.Unlock() - newStates := make([]FileState, len(s.states)) + newStates := make([]State, len(s.states)) copy(newStates, s.states) return newStates } // SetStates overwrites all internal states with the given states array -func (s *States) SetStates(states []FileState) { +func (s *States) SetStates(states []State) { s.mutex.Lock() defer s.mutex.Unlock() s.states = states diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index e586e6b8a14c..287f378c0ef5 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -8,6 +8,7 @@ import ( cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -19,7 +20,7 @@ type Prospector struct { spoolerChan chan *input.FileEvent harvesterChan chan *input.FileEvent done chan struct{} - states *input.States + states *file.States wg sync.WaitGroup } @@ -28,7 +29,7 @@ type Prospectorer interface { Run() } -func NewProspector(cfg *common.Config, states input.States, spoolerChan chan *input.FileEvent) (*Prospector, error) { +func NewProspector(cfg *common.Config, states file.States, spoolerChan chan *input.FileEvent) (*Prospector, error) { prospector := &Prospector{ cfg: cfg, config: defaultConfig, @@ -79,7 +80,7 @@ func (p *Prospector) Init() error { p.prospectorer = prospectorer // Create empty harvester to check if configs are fine - _, err = p.createHarvester(input.FileState{}) + _, err = p.createHarvester(file.State{}) if err != nil { return err } @@ -137,7 +138,7 @@ func (p *Prospector) Stop() { } // createHarvester creates a new harvester instance from the given state -func (p *Prospector) createHarvester(state input.FileState) (*harvester.Harvester, error) { +func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, error) { h, err := harvester.NewHarvester( p.cfg, @@ -151,7 +152,7 @@ func (p *Prospector) createHarvester(state input.FileState) (*harvester.Harveste return h, err } -func (p *Prospector) startHarvester(state input.FileState, offset int64) (*harvester.Harvester, error) { +func (p *Prospector) startHarvester(state file.State, offset int64) (*harvester.Harvester, error) { state.Offset = offset // Create harvester with state h, err := p.createHarvester(state) diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index c9cce65ade7a..02f27f4fe717 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -6,7 +6,7 @@ import ( "time" "github.com/elastic/beats/filebeat/harvester" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" ) @@ -108,12 +108,12 @@ func (p *ProspectorLog) scan() { // TODO: Track harvesters to prevent any file from being harvested twice. Finished state could be delayed? // Now let's do one quick scan to pick up new files - for file, fileinfo := range p.getFiles() { + for f, fileinfo := range p.getFiles() { - logp.Debug("prospector", "Check file for harvesting: %s", file) + logp.Debug("prospector", "Check file for harvesting: %s", f) // Create new state for comparison - newState := input.NewFileState(fileinfo, file) + newState := file.NewState(fileinfo, f) // Load last state index, lastState := p.Prospector.states.FindPrevious(newState) @@ -130,7 +130,7 @@ func (p *ProspectorLog) scan() { } // harvestNewFile harvest a new file -func (p *ProspectorLog) harvestNewFile(state input.FileState) { +func (p *ProspectorLog) harvestNewFile(state file.State) { if !p.isIgnoreOlder(state) { logp.Debug("prospector", "Start harvester for new file: %s", state.Source) @@ -141,7 +141,7 @@ func (p *ProspectorLog) harvestNewFile(state input.FileState) { } // harvestExistingFile continues harvesting a file with a known state if needed -func (p *ProspectorLog) harvestExistingFile(newState input.FileState, oldState input.FileState) { +func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.State) { logp.Debug("prospector", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) @@ -178,7 +178,7 @@ func (p *ProspectorLog) isFileExcluded(file string) bool { } // isIgnoreOlder checks if the given state reached ignore_older -func (p *ProspectorLog) isIgnoreOlder(state input.FileState) bool { +func (p *ProspectorLog) isIgnoreOlder(state file.State) bool { // ignore_older is disable if p.config.IgnoreOlder == 0 { diff --git a/filebeat/prospector/prospector_stdin.go b/filebeat/prospector/prospector_stdin.go index c8fe696293c3..2b7fd2c6667c 100644 --- a/filebeat/prospector/prospector_stdin.go +++ b/filebeat/prospector/prospector_stdin.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/elastic/beats/filebeat/harvester" - "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/input/file" ) type ProspectorStdin struct { @@ -20,7 +20,7 @@ func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) { var err error - prospectorer.harvester, err = p.createHarvester(input.FileState{Source: "-"}) + prospectorer.harvester, err = p.createHarvester(file.State{Source: "-"}) if err != nil { return nil, fmt.Errorf("Error initializing stdin harvester: %v", err) } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index f41e47d94964..c2486a005a0f 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -10,8 +10,8 @@ import ( "time" cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/filebeat/input" . "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" ) @@ -19,8 +19,8 @@ import ( type Registrar struct { Channel chan []*FileEvent done chan struct{} - registryFile string // Path to the Registry File - states *input.States // Map with all file paths inside and the corresponding state + registryFile string // Path to the Registry File + states *file.States // Map with all file paths inside and the corresponding state wg sync.WaitGroup } @@ -29,7 +29,7 @@ func New(registryFile string) (*Registrar, error) { r := &Registrar{ registryFile: registryFile, done: make(chan struct{}), - states: input.NewStates(), + states: file.NewStates(), Channel: make(chan []*FileEvent, 1), wg: sync.WaitGroup{}, } @@ -63,7 +63,7 @@ func (r *Registrar) Init() error { } // GetStates return the registrar states -func (r *Registrar) GetStates() input.States { +func (r *Registrar) GetStates() file.States { return *r.states } @@ -83,24 +83,28 @@ func (r *Registrar) loadStates() error { return nil } - file, err := os.Open(r.registryFile) + f, err := os.Open(r.registryFile) if err != nil { return err } - defer file.Close() + defer f.Close() logp.Info("Loading registrar data from %s", r.registryFile) // DEPRECATED: This should be removed in 6.0 - oldStates := r.loadAndConvertOldState(file) + oldStates := r.loadAndConvertOldState(f) if oldStates { return nil } - decoder := json.NewDecoder(file) - states := []input.FileState{} - decoder.Decode(&states) + decoder := json.NewDecoder(f) + states := []file.State{} + err = decoder.Decode(&states) + if err != nil { + logp.Err("Error decoding states: %s", err) + return err + } r.states.SetStates(states) logp.Info("States Loaded from registrar: %+v", len(states)) @@ -110,12 +114,12 @@ func (r *Registrar) loadStates() error { // loadAndConvertOldState loads the old state file and converts it to the new state // This is designed so it can be easily removed in later versions -func (r *Registrar) loadAndConvertOldState(file *os.File) bool { +func (r *Registrar) loadAndConvertOldState(f *os.File) bool { // Make sure file reader is reset afterwards - defer file.Seek(0, 0) + defer f.Seek(0, 0) - decoder := json.NewDecoder(file) - oldStates := map[string]FileState{} + decoder := json.NewDecoder(f) + oldStates := map[string]file.State{} err := decoder.Decode(&oldStates) if err != nil { @@ -129,7 +133,7 @@ func (r *Registrar) loadAndConvertOldState(file *os.File) bool { } // Convert old states to new states - states := make([]input.FileState, len(oldStates)) + states := make([]file.State, len(oldStates)) logp.Info("Old registry states found: %v", len(oldStates)) counter := 0 for _, state := range oldStates { @@ -214,21 +218,25 @@ func (r *Registrar) writeRegistry() error { logp.Debug("registrar", "Write registry file: %s", r.registryFile) tempfile := r.registryFile + ".new" - file, e := os.Create(tempfile) - if e != nil { - logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, e) - return e + f, err := os.Create(tempfile) + if err != nil { + logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err) + return err } states := r.states.GetStates() - encoder := json.NewEncoder(file) - encoder.Encode(states) + encoder := json.NewEncoder(f) + err = encoder.Encode(states) + if err != nil { + logp.Err("Error when encoding the states: %s", err) + return err + } // Directly close file because of windows - file.Close() + f.Close() logp.Info("Registry file updated. %d states written.", len(states)) - return SafeFileRotate(r.registryFile, tempfile) + return file.SafeFileRotate(r.registryFile, tempfile) }