Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge step #2392

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions analyzer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,10 @@ func NewServerFromConfig() (*Server, error) {
tr.AddTraversalExtension(ge.NewSocketsTraversalExtension())
tr.AddTraversalExtension(ge.NewDescendantsTraversalExtension())
tr.AddTraversalExtension(ge.NewAscendantsTraversalExtension())
tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension())
tr.AddTraversalExtension(ge.NewNextHopTraversalExtension())
tr.AddTraversalExtension(ge.NewGroupTraversalExtension())
tr.AddTraversalExtension(ge.NewMergeTraversalExtension())

// new flow subscriber endpoints
flowSubscriberWSServer := ws.NewStructServer(config.NewWSServer(hub.HTTPServer(), "/ws/subscriber/flow", apiAuthBackend))
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.7.0
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c
github.com/tebeka/go2xunit v1.4.10
github.com/tebeka/selenium v0.0.0-20170314201507-657e45ec600f
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20160928074757-e7cb7fa329f4/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down
1 change: 1 addition & 0 deletions graffiti/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/skydive-project/go-debouncer v1.0.0
github.com/spf13/cast v1.3.1
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.7.0
github.com/tchap/zapext v1.0.0
github.com/xeipuuv/gojsonschema v1.2.0
go.uber.org/zap v1.16.0
Expand Down
22 changes: 22 additions & 0 deletions graffiti/graph/cachedbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ func (c *CachedBackend) GetNode(i Identifier, t Context) []*Node {
return c.persistent.GetNode(i, t)
}

// GetNodesFromIDs retrieve the list of nodes for the list of identifiers from the cache within a time slice
func (c *CachedBackend) GetNodesFromIDs(i []Identifier, t Context) []*Node {
mode := c.cacheMode.Load()

if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil {
return c.memory.GetNodesFromIDs(i, t)
}

return c.persistent.GetNodesFromIDs(i, t)
}

// GetNodeEdges retrieve a list of edges from a node within a time slice, matching metadata
func (c *CachedBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (edges []*Edge) {
mode := c.cacheMode.Load()
Expand All @@ -110,6 +121,17 @@ func (c *CachedBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (edge
return c.persistent.GetNodeEdges(n, t, m)
}

// GetNodesEdges return the list of all edges for a list of nodes within time slice, matching metadata
func (c *CachedBackend) GetNodesEdges(n []*Node, t Context, m ElementMatcher) (edges []*Edge) {
mode := c.cacheMode.Load()

if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil {
return c.memory.GetNodesEdges(n, t, m)
}

return c.persistent.GetNodesEdges(n, t, m)
}

// EdgeAdded add an edge in the cache
func (c *CachedBackend) EdgeAdded(e *Edge) error {
mode := c.cacheMode.Load()
Expand Down
110 changes: 110 additions & 0 deletions graffiti/graph/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ const graphElementMapping = `
const (
nodeType = "node"
edgeType = "edge"
// maxClauseCount limit the number of clauses in one query to ES
maxClauseCount = 512
)

// ElasticSearchBackend describes a persistent backend based on ElasticSearch
Expand Down Expand Up @@ -242,6 +244,44 @@ func (b *ElasticSearchBackend) GetNode(i Identifier, t Context) []*Node {
return nodes
}

// GetNodesFromIDs get the list of nodes for the list of identifiers within a time slice
func (b *ElasticSearchBackend) GetNodesFromIDs(identifiersList []Identifier, t Context) []*Node {
if len(identifiersList) == 0 {
return []*Node{}
}

// ES default max number of clauses is set by default to 1024
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-settings.html
// Group queries in a maximum of half of the max.
// Other filters (time), will be also in the query.
identifiersBatch := batchIdentifiers(identifiersList, maxClauseCount)

nodes := []*Node{}

for _, idList := range identifiersBatch {
identifiersFilter := []*filters.Filter{}
for _, i := range idList {
identifiersFilter = append(identifiersFilter, filters.NewTermStringFilter("ID", string(i)))
}
identifiersORFilter := filters.NewOrFilter(identifiersFilter...)

nodes = append(nodes, b.searchNodes(&TimedSearchQuery{
SearchQuery: filters.SearchQuery{
Filter: identifiersORFilter,
Sort: true,
SortBy: "Revision",
},
TimeFilter: getTimeFilter(t.TimeSlice),
})...)
}

if len(nodes) > 1 && t.TimePoint {
return []*Node{nodes[len(nodes)-1]}
}

return nodes
}

func (b *ElasticSearchBackend) indexEdge(e *Edge) error {
raw, err := edgeToRaw(e)
if err != nil {
Expand Down Expand Up @@ -506,6 +546,53 @@ func (b *ElasticSearchBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher
return
}

// GetNodesEdges return the list of all edges for a list of nodes within time slice
func (b *ElasticSearchBackend) GetNodesEdges(nodeList []*Node, t Context, m ElementMatcher) (edges []*Edge) {
if len(nodeList) == 0 {
return []*Edge{}
}

// See comment at GetNodesFromIDs
// As we are adding two operations per item, make small batches
nodesBatch := batchNodes(nodeList, maxClauseCount/2)

for _, nList := range nodesBatch {
var filter *filters.Filter
if m != nil {
f, err := m.Filter()
if err != nil {
return []*Edge{}
}
filter = f
}

var searchQuery filters.SearchQuery
if !t.TimePoint {
searchQuery = filters.SearchQuery{Sort: true, SortBy: "UpdatedAt"}
}

nodesFilter := []*filters.Filter{}
for _, n := range nList {
nodesFilter = append(nodesFilter, filters.NewTermStringFilter("Parent", string(n.ID)))
nodesFilter = append(nodesFilter, filters.NewTermStringFilter("Child", string(n.ID)))
}
searchQuery.Filter = filters.NewOrFilter(nodesFilter...)

edges = append(edges, b.searchEdges(&TimedSearchQuery{
SearchQuery: searchQuery,
TimeFilter: getTimeFilter(t.TimeSlice),
ElementFilter: filter,
})...)

}

if len(edges) > 1 && t.TimePoint {
edges = dedupEdges(edges)
}

return
}

// IsHistorySupported returns that this backend does support history
func (b *ElasticSearchBackend) IsHistorySupported() bool {
return true
Expand Down Expand Up @@ -647,3 +734,26 @@ func NewElasticSearchBackendFromConfig(cfg es.Config, extraDynamicTemplates map[

return newElasticSearchBackendFromClient(client, cfg.IndexPrefix, liveIndex, archiveIndex, logger), nil
}

func batchNodes(items []*Node, batchSize int) [][]*Node {
batches := make([][]*Node, 0, (len(items)+batchSize-1)/batchSize)

for batchSize < len(items) {
items, batches = items[batchSize:], append(batches, items[0:batchSize:batchSize])
}
batches = append(batches, items)

return batches

}

func batchIdentifiers(items []Identifier, batchSize int) [][]Identifier {
batches := make([][]Identifier, 0, (len(items)+batchSize-1)/batchSize)

for batchSize < len(items) {
items, batches = items[batchSize:], append(batches, items[0:batchSize:batchSize])
}
batches = append(batches, items)

return batches
}
33 changes: 33 additions & 0 deletions graffiti/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ type Backend interface {
NodeAdded(n *Node) error
NodeDeleted(n *Node) error
GetNode(i Identifier, at Context) []*Node
GetNodesFromIDs(i []Identifier, at Context) []*Node
GetNodeEdges(n *Node, at Context, m ElementMatcher) []*Edge
GetNodesEdges(n []*Node, at Context, m ElementMatcher) []*Edge

EdgeAdded(e *Edge) error
EdgeDeleted(e *Edge) error
Expand Down Expand Up @@ -573,6 +575,21 @@ func (n *Node) String() string {
return string(b)
}

func (n *Node) Copy() *Node {
return &Node{
graphElement: graphElement{
ID: n.ID,
Host: n.Host,
Origin: n.Origin,
CreatedAt: n.CreatedAt,
UpdatedAt: n.UpdatedAt,
DeletedAt: n.DeletedAt,
Revision: n.Revision,
Metadata: n.Metadata.Copy(),
},
}
}

// UnmarshalJSON custom unmarshal function
func (n *Node) UnmarshalJSON(b []byte) error {
// wrapper to avoid unmarshal infinite loop
Expand Down Expand Up @@ -1185,6 +1202,14 @@ func (g *Graph) GetNode(i Identifier) *Node {
return nil
}

// GetNodesFromIDs returns a list of nodes from a list of identifiers
func (g *Graph) GetNodesFromIDs(i []Identifier) []*Node {
if len(i) == 0 {
return []*Node{}
}
return g.backend.GetNodesFromIDs(i, g.context)
}

// CreateNode returns a new node not bound to a graph
func CreateNode(i Identifier, m Metadata, t Time, h string, o string) *Node {
n := &Node{
Expand Down Expand Up @@ -1365,6 +1390,14 @@ func (g *Graph) GetNodeEdges(n *Node, m ElementMatcher) []*Edge {
return g.backend.GetNodeEdges(n, g.context, m)
}

// GetNodesEdges returns the list with all edges for a list of nodes
func (g *Graph) GetNodesEdges(n []*Node, m ElementMatcher) []*Edge {
if len(n) == 0 {
return []*Edge{}
}
return g.backend.GetNodesEdges(n, g.context, m)
}

func (g *Graph) String() string {
j, _ := json.Marshal(g)
return string(j)
Expand Down
44 changes: 44 additions & 0 deletions graffiti/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func newGraph(t *testing.T) *Graph {
Expand Down Expand Up @@ -509,6 +512,47 @@ func TestEvents(t *testing.T) {
}
}

func TestNodeCopy(t *testing.T) {
n := &Node{
graphElement: graphElement{
ID: Identifier("id"),
Host: "Host",
Origin: "Origin",
CreatedAt: Time(time.Unix(100, 0)),
UpdatedAt: Time(time.Unix(200, 0)),
DeletedAt: Time(time.Unix(300, 0)),
Revision: 1,
Metadata: Metadata{"foo": "bar"},
},
}

cn := n.Copy()
assert.Equal(t, n, cn)

// Check that modifications in the copied node do not affect the origin node
ok := cn.Metadata.SetField("new", "value")
assert.Truef(t, ok, "copied node set metadata")
assert.NotEqualf(t, n, cn, "Metadata")

cn.Host = "modified"
assert.NotEqualf(t, n, cn, "Host")

cn.Origin = "modified"
assert.NotEqualf(t, n, cn, "Origin")

cn.Revision = 99
assert.NotEqualf(t, n, cn, "Revision")

cn.CreatedAt = Time(time.Unix(100, 99))
assert.NotEqualf(t, n, cn, "CreatedAt")

cn.UpdatedAt = Time(time.Unix(200, 99))
assert.NotEqualf(t, n, cn, "UpdatedAt")

cn.DeletedAt = Time(time.Unix(300, 99))
assert.NotEqualf(t, n, cn, "DeletedAt")
}

type FakeRecursiveListener1 struct {
DefaultGraphListener
graph *Graph
Expand Down
27 changes: 27 additions & 0 deletions graffiti/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ func (m *MemoryBackend) GetNode(i Identifier, t Context) []*Node {
return nil
}

// GetNodesFromIDs from the graph backend
func (m *MemoryBackend) GetNodesFromIDs(identifiersList []Identifier, t Context) []*Node {
nodes := []*Node{}
for _, i := range identifiersList {
if n, ok := m.nodes[i]; ok {
nodes = append(nodes, n.Node)
}
}
return nodes
}

// GetNodeEdges returns a list of edges of a node
func (m *MemoryBackend) GetNodeEdges(n *Node, t Context, meta ElementMatcher) []*Edge {
edges := []*Edge{}
Expand All @@ -153,6 +164,22 @@ func (m *MemoryBackend) GetNodeEdges(n *Node, t Context, meta ElementMatcher) []
return edges
}

// GetNodesEdges returns the list of edges for a list of nodes
func (m *MemoryBackend) GetNodesEdges(nodeList []*Node, t Context, meta ElementMatcher) []*Edge {
edges := []*Edge{}
for _, n := range nodeList {
if n, ok := m.nodes[n.ID]; ok {
for _, e := range n.edges {
if e.MatchMetadata(meta) {
edges = append(edges, e.Edge)
}
}
}
}

return edges
}

// EdgeDeleted in the graph backend
func (m *MemoryBackend) EdgeDeleted(e *Edge) error {
if _, ok := m.edges[e.ID]; !ok {
Expand Down
Loading