Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand key for template cache in IPFix and v9 #167

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ipfix/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,19 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
// This check is somewhat redundant with the switch-clause below, but the retrieve() operation should not be executed inside the loop.
if setHeader.SetID > 255 {
var ok bool
if tr, ok = mem.retrieve(setHeader.SetID, d.raddr); !ok {
if tr, ok = mem.retrieve(setHeader.SetID, d.raddr, msg.Header.DomainID); !ok {
select {
case rpcChan <- RPCRequest{
ID: setHeader.SetID,
IP: d.raddr,
SrcID: msg.Header.DomainID,
}:
default:
}
err = nonfatalError{fmt.Errorf("%s unknown ipfix template id# %d",
err = nonfatalError{fmt.Errorf("%s unknown ipfix template id# %d with domain ID %d",
d.raddr.String(),
setHeader.SetID,
msg.Header.DomainID,
)}
}
}
Expand All @@ -196,7 +198,7 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
err = tr.unmarshalOpts(d.reader)
}
if err == nil {
mem.insert(tr.TemplateID, d.raddr, tr)
mem.insert(tr.TemplateID, d.raddr, tr, msg.Header.DomainID)
}
} else if setID >= 4 && setID <= 255 {
// Reserved set, do not read any records
Expand Down
6 changes: 3 additions & 3 deletions ipfix/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
var tpl, optsTpl, multiMessage, unknownDatasetMessage []byte

func init() {
// IPFIX packet including template SetID:2, 25 fields
// IPFIX packet including template SetID:2, 25 fields, Domain id 33792
tpl = []byte{
0x0, 0xa, 0x0, 0x7c, 0x58, 0x90, 0xd6, 0x40, 0x28, 0xf7,
0xa0, 0x4a, 0x0, 0x0, 0x84, 0x0, 0x0, 0x2, 0x0, 0x6c, 0x1,
Expand Down Expand Up @@ -205,8 +205,8 @@ func TestUnknownDatasetsMessage(t *testing.T) {
t.Error("Did not expect any result datasets, but got", l)
}
expectedErrorStr := `Multiple errors:
- 127.0.0.1 unknown ipfix template id# 264
- 127.0.0.1 unknown ipfix template id# 264`
- 127.0.0.1 unknown ipfix template id# 264 with domain ID 1
- 127.0.0.1 unknown ipfix template id# 264 with domain ID 1`
if err == nil || err.Error() != expectedErrorStr {
t.Error("Received unexpected erorr:", err)
}
Expand Down
23 changes: 14 additions & 9 deletions ipfix/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,32 @@ func GetCache(cacheFile string) MemCache {
return m
}

func (m MemCache) getShard(id uint16, addr net.IP) (*TemplatesShard, uint32) {
b := make([]byte, 2)
binary.BigEndian.PutUint16(b, id)
key := append(addr, b...)

func (m MemCache) getShard(templateId uint16, addr net.IP, domainId uint32) (*TemplatesShard, uint32) {
var key []byte
hash := fnv.New32()
dId := make([]byte, 4)
tID := make([]byte, 2)
binary.LittleEndian.PutUint32(dId, domainId)
binary.BigEndian.PutUint16(tID, templateId)
key = append(key, addr...)
key = append(key, dId...)
key = append(key, tID...)

hash.Write(key)
hSum32 := hash.Sum32()

return m[uint(hSum32)%uint(shardNo)], hSum32
}

func (m MemCache) insert(id uint16, addr net.IP, tr TemplateRecord) {
shard, key := m.getShard(id, addr)
func (m MemCache) insert(id uint16, addr net.IP, tr TemplateRecord, domainID uint32) {
shard, key := m.getShard(id, addr, domainID)
shard.Lock()
defer shard.Unlock()
shard.Templates[key] = Data{tr, time.Now().Unix()}
}

func (m MemCache) retrieve(id uint16, addr net.IP) (TemplateRecord, bool) {
shard, key := m.getShard(id, addr)
func (m MemCache) retrieve(id uint16, addr net.IP, domainID uint32) (TemplateRecord, bool) {
shard, key := m.getShard(id, addr, domainID)
shard.RLock()
defer shard.RUnlock()
v, ok := shard.Templates[key]
Expand Down
5 changes: 3 additions & 2 deletions ipfix/memcache_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type RPCConfig struct {
type RPCRequest struct {
ID uint16
IP net.IP
SrcID uint32
}

type vFlowServer struct {
Expand Down Expand Up @@ -91,7 +92,7 @@ func NewRPC(mCache MemCache) *IRPC {
func (r *IRPC) Get(req RPCRequest, resp *TemplateRecord) error {
var ok bool

*resp, ok = r.mCache.retrieve(req.ID, req.IP)
*resp, ok = r.mCache.retrieve(req.ID, req.IP, req.SrcID)
if !ok {
return errNotAvail
}
Expand Down Expand Up @@ -168,7 +169,7 @@ func RPC(m MemCache, config *RPCConfig) {
continue
}

m.insert(req.ID, req.IP, *tr)
m.insert(req.ID, req.IP, *tr, req.SrcID)
break
}

Expand Down
42 changes: 36 additions & 6 deletions ipfix/memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestMemCacheRetrieve(t *testing.T) {
mCache := GetCache("cache.file")
d := NewDecoder(ip, tpl)
d.Decode(mCache)
v, ok := mCache.retrieve(256, ip)
v, ok := mCache.retrieve(256, ip, 33792)
if !ok {
t.Error("expected mCache retrieve status true, got", ok)
}
Expand All @@ -48,9 +48,9 @@ func TestMemCacheInsert(t *testing.T) {
mCache := GetCache("cache.file")

tpl.TemplateID = 310
mCache.insert(310, ip, tpl)
mCache.insert(310, ip, tpl, 513)

v, ok := mCache.retrieve(310, ip)
v, ok := mCache.retrieve(310, ip, 513)
if !ok {
t.Error("expected mCache retrieve status true, got", ok)
}
Expand All @@ -65,15 +65,45 @@ func TestMemCacheAllSetIds(t *testing.T) {
mCache := GetCache("cache.file")

tpl.TemplateID = 310
mCache.insert(tpl.TemplateID, ip, tpl)
mCache.insert(tpl.TemplateID, ip, tpl, 513)
tpl.TemplateID = 410
mCache.insert(tpl.TemplateID, ip, tpl)
mCache.insert(tpl.TemplateID, ip, tpl, 513)
tpl.TemplateID = 210
mCache.insert(tpl.TemplateID, ip, tpl)
mCache.insert(tpl.TemplateID, ip, tpl, 513)

expected := []int{210, 310, 410}
actual := mCache.allSetIds()
if !reflect.DeepEqual(expected, actual) {
t.Errorf("Expected set IDs %v, got %v", expected, actual)
}
}

func TestMemCache_keyWithDifferentDomainIDs(t *testing.T) {
var tpl TemplateRecord
ip := net.ParseIP("127.0.0.1")
mCache := GetCache("cache.file")

tpl.TemplateID = 310
tpl.FieldCount = 19
mCache.insert(tpl.TemplateID, ip, tpl, 513)
tpl.FieldCount = 21
mCache.insert(tpl.TemplateID, ip, tpl, 514)

v, ok := mCache.retrieve(tpl.TemplateID, ip, 513)

if !ok {
t.Error("expected mCache retrieve status true, got", ok)
}
if v.FieldCount != 19 {
t.Error("expected template id#:310 with Field count#:19, got", v.TemplateID, v.FieldCount)
}

v, ok = mCache.retrieve(tpl.TemplateID, ip, 514)

if !ok {
t.Error("expected mCache retrieve status true, got", ok)
}
if v.FieldCount != 21 {
t.Error("expected template id#:310 with Field count#:21, got", v.TemplateID, v.FieldCount)
}
}
8 changes: 4 additions & 4 deletions netflow/v9/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,11 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
// This check is somewhat redundant with the switch-clause below, but the retrieve() operation should not be executed inside the loop.
if setHeader.FlowSetID > 255 {
var ok bool
tr, ok = mem.retrieve(setHeader.FlowSetID, d.raddr)
tr, ok = mem.retrieve(setHeader.FlowSetID, d.raddr, msg.Header.SrcID)
if !ok {
err = nonfatalError(fmt.Errorf("%s unknown netflow template id# %d",
err = nonfatalError(fmt.Errorf("%s unknown netflow template id# %d from sourceID %d",
d.raddr.String(),
setHeader.FlowSetID,
setHeader.FlowSetID, msg.Header.SrcID,
))
}
}
Expand All @@ -446,7 +446,7 @@ func (d *Decoder) decodeSet(mem MemCache, msg *Message) error {
err = tr.unmarshalOpts(d.reader)
}
if err == nil {
mem.insert(tr.TemplateID, d.raddr, tr)
mem.insert(tr.TemplateID, d.raddr, tr, msg.Header.SrcID)
}
} else if setId >= 4 && setId <= 255 {
// Reserved set, do not read any records
Expand Down
23 changes: 14 additions & 9 deletions netflow/v9/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,32 @@ func GetCache(cacheFile string) MemCache {
return m
}

func (m MemCache) getShard(id uint16, addr net.IP) (*TemplatesShard, uint32) {
b := make([]byte, 2)
binary.BigEndian.PutUint16(b, id)
key := append(addr, b...)

func (m MemCache) getShard(templateId uint16, addr net.IP, srcId uint32) (*TemplatesShard, uint32) {
var key []byte
hash := fnv.New32()
sId := make([]byte, 4)
tID := make([]byte, 2)
binary.LittleEndian.PutUint32(sId, srcId)
binary.BigEndian.PutUint16(tID, templateId)
key = append(key, addr...)
key = append(key, sId...)
key = append(key, tID...)

hash.Write(key)
hSum32 := hash.Sum32()

return m[uint(hSum32)%uint(shardNo)], hSum32
}

func (m *MemCache) insert(id uint16, addr net.IP, tr TemplateRecord) {
shard, key := m.getShard(id, addr)
func (m *MemCache) insert(id uint16, addr net.IP, tr TemplateRecord, srcID uint32) {
shard, key := m.getShard(id, addr, srcID)
shard.Lock()
defer shard.Unlock()
shard.Templates[key] = Data{tr, time.Now().Unix()}
}

func (m *MemCache) retrieve(id uint16, addr net.IP) (TemplateRecord, bool) {
shard, key := m.getShard(id, addr)
func (m *MemCache) retrieve(id uint16, addr net.IP, srcID uint32) (TemplateRecord, bool) {
shard, key := m.getShard(id, addr, srcID)
shard.RLock()
defer shard.RUnlock()
v, ok := shard.Templates[key]
Expand Down