Skip to content

Commit

Permalink
Fix flow timestamp update (#4968)
Browse files Browse the repository at this point in the history
(cherry picked from commit cbbd9ea)
  • Loading branch information
Steffen Siering authored and tsg committed Aug 24, 2017
1 parent 8f61e47 commit 4b5d91f
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di

*Packetbeat*

- Update flow timestamp on each packet being received. {issue}4895[4895]

*Winlogbeat*

==== Added
Expand Down
5 changes: 4 additions & 1 deletion packetbeat/flows/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func (t *flowMetaTable) get(id *FlowID, counter *counterReg) Flow {
}

func (t *flowTable) get(id *FlowID, counter *counterReg) Flow {
ts := time.Now()

t.mutex.Lock()
defer t.mutex.Unlock()

Expand All @@ -66,13 +68,14 @@ func (t *flowTable) get(id *FlowID, counter *counterReg) Flow {
if bf == nil || !bf.isAlive() {
debugf("create new flow")

bf = newBiFlow(id.rawFlowID.clone(), time.Now(), id.dir)
bf = newBiFlow(id.rawFlowID.clone(), ts, id.dir)
t.table[string(bf.id.flowID)] = bf
t.flows.append(bf)
} else if bf.dir != id.dir {
dir = flowDirReversed
}

bf.ts = ts
stats := bf.stats[dir]
if stats == nil {
stats = newFlowStats(counter)
Expand Down
9 changes: 9 additions & 0 deletions packetbeat/tests/system/test_0060_flows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from packetbeat import (BaseTest, FLOWS_REQUIRED_FIELDS)
from pprint import PrettyPrinter
from datetime import datetime
import six


Expand All @@ -11,6 +12,10 @@ def check_fields(flow, fields):
assert flow[k] == v


def parse_timestamp(ts):
return datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S.%fZ")


class Test(BaseTest):

def test_mysql_flow(self):
Expand Down Expand Up @@ -43,6 +48,10 @@ def test_mysql_flow(self):
'dest.stats.net_bytes_total': 181133,
})

start_ts = parse_timestamp(objs[0]['start_time'])
last_ts = parse_timestamp(objs[0]['last_time'])
assert last_ts > start_ts

def test_memcache_udp_flow(self):
self.render_config_template(
flows=True,
Expand Down

0 comments on commit 4b5d91f

Please sign in to comment.