Skip to content

Commit

Permalink
[Filebeat] Fix offset field pointing at end of line for elastic#6514 (e…
Browse files Browse the repository at this point in the history
…lastic#7345)

(cherry picked from commit becec42)
  • Loading branch information
ph authored and ruflin committed Jun 18, 2018
1 parent df21d50 commit 470c0d4
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits]
- Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]
- Keep different registry entry per container stream to avoid wrong offsets. {issue}7281[7281]
- Fix offset field pointing at end of a line. {issue}6514[6514]
*Heartbeat*
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (h *Harvester) Run() error {
// This is important in case sending is not successful so on shutdown
// the old offset is reported
state := h.getState()
startingOffset := state.Offset
state.Offset += int64(message.Bytes)

// Create state event
Expand All @@ -281,7 +282,7 @@ func (h *Harvester) Run() error {
if !message.IsEmpty() && h.shouldExportLine(text) {
fields := common.MapStr{
"source": state.Source,
"offset": state.Offset, // Offset here is the offset before the starting char.
"offset": startingOffset, // Offset here is the offset before the starting char.
}
fields.DeepUpdate(message.Fields)

Expand Down
9 changes: 7 additions & 2 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ def test_shutdown_wait_ok(self):

# we allow for a potential race in the harvester shutdown here.
# In some cases the registry offset might match the penultimate offset.
assert (offset == outputs[-1]["offset"] or
offset == outputs[-2]["offset"])

eol_offset = 1
if os.name == "nt":
eol_offset += 1

assert (offset == (outputs[-1]["offset"] + eol_offset + len(outputs[-1]["message"])) or
offset == (outputs[-2]["offset"] + eol_offset + len(outputs[-2]["message"])))

def test_shutdown_wait_timeout(self):
"""
Expand Down

0 comments on commit 470c0d4

Please sign in to comment.