Skip to content

Commit

Permalink
Store offset in log.offset field of events from the filestream input (#…
Browse files Browse the repository at this point in the history
…27688)

## What does this PR do?

This PR fixes the `FileMetaReader` that previously only added the size of the message to `log.offset` field, instead of the offset in the file. It does not impact the log input, only the filestream.

(cherry picked from commit e081819)
  • Loading branch information
kvch authored and mergify-bot committed Sep 1, 2021
1 parent 4424f3d commit 84fd9a5
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixes a bug in `http_endpoint` that caused numbers encoded as strings. {issue}27382[27382] {pull}27480[27480]
- Update indentation for azure filebeat configuration. {pull}26604[26604]
- Auditd: Fix Top Exec Commands dashboard visualization. {pull}27638[27638]
- Store offset in `log.offset` field of events from the filestream input. {pull}27688[27688]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo

r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)

r = readfile.NewFilemeta(r, fs.newPath)
r = readfile.NewFilemeta(r, fs.newPath, offset)

r = inp.parsers.Create(r)

Expand Down
13 changes: 8 additions & 5 deletions libbeat/reader/readfile/metafields.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,36 @@ import (
type FileMetaReader struct {
reader reader.Reader
path string
offset int64
}

// New creates a new Encode reader from input reader by applying
// the given codec.
func NewFilemeta(r reader.Reader, path string) reader.Reader {
return &FileMetaReader{r, path}
func NewFilemeta(r reader.Reader, path string, offset int64) reader.Reader {
return &FileMetaReader{r, path, offset}
}

// Next reads the next line from it's initial io.Reader
// This converts a io.Reader to a reader.reader
func (r FileMetaReader) Next() (reader.Message, error) {
func (r *FileMetaReader) Next() (reader.Message, error) {
message, err := r.reader.Next()

r.offset += int64(message.Bytes)

// if the message is empty, there is no need to enrich it with file metadata
if message.IsEmpty() {
return message, err
}

message.Fields.DeepUpdate(common.MapStr{
"log": common.MapStr{
"offset": message.Bytes,
"offset": r.offset,
"path": r.path,
},
})
return message, err
}

func (r FileMetaReader) Close() error {
func (r *FileMetaReader) Close() error {
return r.reader.Close()
}
95 changes: 95 additions & 0 deletions libbeat/reader/readfile/metafields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package readfile

import (
"io"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/reader"
)

func TestMetaFieldsOffset(t *testing.T) {
messages := []reader.Message{
reader.Message{
Content: []byte("my line"),
Bytes: 7,
Fields: common.MapStr{},
},
reader.Message{
Content: []byte("my line again"),
Bytes: 13,
Fields: common.MapStr{},
},
reader.Message{
Content: []byte(""),
Bytes: 10,
Fields: common.MapStr{},
},
}

path := "test/path"
offset := int64(0)
in := &FileMetaReader{msgReader(messages), path, offset}
for {
msg, err := in.Next()
if err == io.EOF {
break
}
offset += int64(msg.Bytes)

expectedFields := common.MapStr{}
if len(msg.Content) != 0 {
expectedFields = common.MapStr{
"log": common.MapStr{
"path": path,
"offset": offset,
},
}
}
require.Equal(t, expectedFields, msg.Fields)
require.Equal(t, offset, in.offset)
}
}

func msgReader(m []reader.Message) reader.Reader {
return &messageReader{
messages: m,
}
}

type messageReader struct {
messages []reader.Message
i int
}

func (r *messageReader) Next() (reader.Message, error) {
if r.i == len(r.messages) {
return reader.Message{}, io.EOF
}
msg := r.messages[r.i]
r.i++
return msg, nil
}

func (r *messageReader) Close() error {
return nil
}

0 comments on commit 84fd9a5

Please sign in to comment.