From d6b6ae95a26c71eaddc62ec5157b5306cb2590f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 1 Sep 2021 16:55:46 +0200 Subject: [PATCH] Store offset in log.offset field of events from the filestream input (#27688) ## What does this PR do? This PR fixes the `FileMetaReader` that previously only added the size of the message to `log.offset` field, instead of the offset in the file. It does not impact the log input, only the filestream. (cherry picked from commit e0818199d6c302abce096206de4dad51bdbaedaa) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/filestream/input.go | 2 +- libbeat/reader/readfile/metafields.go | 13 +-- libbeat/reader/readfile/metafields_test.go | 95 ++++++++++++++++++++++ 4 files changed, 105 insertions(+), 6 deletions(-) create mode 100644 libbeat/reader/readfile/metafields_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8bb496a24186..ee90eb25eff0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -192,6 +192,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixes the Snyk module to work with the new API changes. {pull}27358[27358] - Fixes a bug in `http_endpoint` that caused numbers encoded as strings. {issue}27382[27382] {pull}27480[27480] - Auditd: Fix Top Exec Commands dashboard visualization. {pull}27638[27638] +- Store offset in `log.offset` field of events from the filestream input. {pull}27688[27688] *Heartbeat* diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 48d5f6dca77c..ec051273171d 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -219,7 +219,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator) - r = readfile.NewFilemeta(r, fs.newPath) + r = readfile.NewFilemeta(r, fs.newPath, offset) r = inp.parsers.Create(r) diff --git a/libbeat/reader/readfile/metafields.go b/libbeat/reader/readfile/metafields.go index 5039484e8fd4..8d6c34eca63d 100644 --- a/libbeat/reader/readfile/metafields.go +++ b/libbeat/reader/readfile/metafields.go @@ -27,19 +27,22 @@ import ( type FileMetaReader struct { reader reader.Reader path string + offset int64 } // New creates a new Encode reader from input reader by applying // the given codec. -func NewFilemeta(r reader.Reader, path string) reader.Reader { - return &FileMetaReader{r, path} +func NewFilemeta(r reader.Reader, path string, offset int64) reader.Reader { + return &FileMetaReader{r, path, offset} } // Next reads the next line from it's initial io.Reader // This converts a io.Reader to a reader.reader -func (r FileMetaReader) Next() (reader.Message, error) { +func (r *FileMetaReader) Next() (reader.Message, error) { message, err := r.reader.Next() + r.offset += int64(message.Bytes) + // if the message is empty, there is no need to enrich it with file metadata if message.IsEmpty() { return message, err @@ -47,13 +50,13 @@ func (r FileMetaReader) Next() (reader.Message, error) { message.Fields.DeepUpdate(common.MapStr{ "log": common.MapStr{ - "offset": message.Bytes, + "offset": r.offset, "path": r.path, }, }) return message, err } -func (r FileMetaReader) Close() error { +func (r *FileMetaReader) Close() error { return r.reader.Close() } diff --git a/libbeat/reader/readfile/metafields_test.go b/libbeat/reader/readfile/metafields_test.go new file mode 100644 index 000000000000..eb198a776c05 --- /dev/null +++ b/libbeat/reader/readfile/metafields_test.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package readfile + +import ( + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/reader" +) + +func TestMetaFieldsOffset(t *testing.T) { + messages := []reader.Message{ + reader.Message{ + Content: []byte("my line"), + Bytes: 7, + Fields: common.MapStr{}, + }, + reader.Message{ + Content: []byte("my line again"), + Bytes: 13, + Fields: common.MapStr{}, + }, + reader.Message{ + Content: []byte(""), + Bytes: 10, + Fields: common.MapStr{}, + }, + } + + path := "test/path" + offset := int64(0) + in := &FileMetaReader{msgReader(messages), path, offset} + for { + msg, err := in.Next() + if err == io.EOF { + break + } + offset += int64(msg.Bytes) + + expectedFields := common.MapStr{} + if len(msg.Content) != 0 { + expectedFields = common.MapStr{ + "log": common.MapStr{ + "path": path, + "offset": offset, + }, + } + } + require.Equal(t, expectedFields, msg.Fields) + require.Equal(t, offset, in.offset) + } +} + +func msgReader(m []reader.Message) reader.Reader { + return &messageReader{ + messages: m, + } +} + +type messageReader struct { + messages []reader.Message + i int +} + +func (r *messageReader) Next() (reader.Message, error) { + if r.i == len(r.messages) { + return reader.Message{}, io.EOF + } + msg := r.messages[r.i] + r.i++ + return msg, nil +} + +func (r *messageReader) Close() error { + return nil +}