diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a6d46a5d263..e49853d4313 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202] - Keep different registry entry per container stream to avoid wrong offsets. {issue}7281[7281] - Optimize PostgreSQL ingest pipeline to use anchored regexp and merge multiple regexp into a single expression. {pull}7269[7269] +- Fix offset field pointing at end of a line. {issue}6514[6514] *Heartbeat* - Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616] diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index c42e5fa6ecd..4022304d5ea 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -267,6 +267,7 @@ func (h *Harvester) Run() error { // This is important in case sending is not successful so on shutdown // the old offset is reported state := h.getState() + startingOffset := state.Offset state.Offset += int64(message.Bytes) // Create state event @@ -281,7 +282,7 @@ func (h *Harvester) Run() error { if !message.IsEmpty() && h.shouldExportLine(text) { fields := common.MapStr{ "source": state.Source, - "offset": state.Offset, // Offset here is the offset before the starting char. + "offset": startingOffset, // Offset here is the offset before the starting char. } fields.DeepUpdate(message.Fields) diff --git a/filebeat/tests/system/test_shutdown.py b/filebeat/tests/system/test_shutdown.py index b362414f3f4..4577b61be45 100644 --- a/filebeat/tests/system/test_shutdown.py +++ b/filebeat/tests/system/test_shutdown.py @@ -64,8 +64,13 @@ def test_shutdown_wait_ok(self): # we allow for a potential race in the harvester shutdown here. # In some cases the registry offset might match the penultimate offset. - assert (offset == outputs[-1]["offset"] or - offset == outputs[-2]["offset"]) + + eol_offset = 1 + if os.name == "nt": + eol_offset += 1 + + assert (offset == (outputs[-1]["offset"] + eol_offset + len(outputs[-1]["message"])) or + offset == (outputs[-2]["offset"] + eol_offset + len(outputs[-2]["message"]))) def test_shutdown_wait_timeout(self): """