Skip to content

Commit

Permalink
Neighbors step
Browse files Browse the repository at this point in the history
Like Descendants, but using edges in any direction (Descendants only
uses edges from parent to child, Neighbors uses from parent to child and
from child to parent).

The different with the Both step, is Neighbors accumulate nodes seen.
Example (pseudo-syntax):
A -> B -> C
G.V(A).Out().Out() return: C.

But:
G.V(A).Neighbors(2) return: A,B,C

The parameters allowed are the same as in Descendants.
Example:
G.V('foo').Neighbors('RelationType',Within('ownership','foobar'),2)

To improve speed and reduce backend load when using persistent backends,
a new method, GetNodesFromIDs, is implemented in Graph and Backends.
This method only uses one call (or a few in we have hundreds of nodes,
see batching) to get all nodes from the backend.

Batching is used to avoid hitting the max number of clauses set by ES
(is set to the default value of 512).
  • Loading branch information
adrianlzt committed Aug 9, 2021
1 parent e455f7a commit d66e0ea
Show file tree
Hide file tree
Showing 11 changed files with 778 additions and 0 deletions.
1 change: 1 addition & 0 deletions analyzer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func NewServerFromConfig() (*Server, error) {
tr.AddTraversalExtension(ge.NewSocketsTraversalExtension())
tr.AddTraversalExtension(ge.NewDescendantsTraversalExtension())
tr.AddTraversalExtension(ge.NewAscendantsTraversalExtension())
tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension())
tr.AddTraversalExtension(ge.NewNextHopTraversalExtension())
tr.AddTraversalExtension(ge.NewGroupTraversalExtension())

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.7.0
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c
github.com/tebeka/go2xunit v1.4.10
github.com/tebeka/selenium v0.0.0-20170314201507-657e45ec600f
Expand Down
11 changes: 11 additions & 0 deletions graffiti/graph/cachedbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,17 @@ func (c *CachedBackend) GetNode(i Identifier, t Context) []*Node {
return c.persistent.GetNode(i, t)
}

// GetNodesFromIDs retrieve the list of nodes for the list of identifiers from the cache within a time slice
func (c *CachedBackend) GetNodesFromIDs(i []Identifier, t Context) []*Node {
mode := c.cacheMode.Load()

if t.TimeSlice == nil || mode == CacheOnlyMode || c.persistent == nil {
return c.memory.GetNodesFromIDs(i, t)
}

return c.persistent.GetNodesFromIDs(i, t)
}

// GetNodeEdges retrieve a list of edges from a node within a time slice, matching metadata
func (c *CachedBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (edges []*Edge) {
mode := c.cacheMode.Load()
Expand Down
49 changes: 49 additions & 0 deletions graffiti/graph/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,44 @@ func (b *ElasticSearchBackend) GetNode(i Identifier, t Context) []*Node {
return nodes
}

// GetNodesFromIDs get the list of nodes for the list of identifiers within a time slice
func (b *ElasticSearchBackend) GetNodesFromIDs(identifiersList []Identifier, t Context) []*Node {
if len(identifiersList) == 0 {
return []*Node{}
}

// ES default max number of clauses is set by default to 1024
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-settings.html
// Group queries in a maximum of half of the max.
// Other filters (time), will be also in the query.
identifiersBatch := batchIdentifiers(identifiersList, maxClauseCount)

nodes := []*Node{}

for _, idList := range identifiersBatch {
identifiersFilter := []*filters.Filter{}
for _, i := range idList {
identifiersFilter = append(identifiersFilter, filters.NewTermStringFilter("ID", string(i)))
}
identifiersORFilter := filters.NewOrFilter(identifiersFilter...)

nodes = append(nodes, b.searchNodes(&TimedSearchQuery{
SearchQuery: filters.SearchQuery{
Filter: identifiersORFilter,
Sort: true,
SortBy: "Revision",
},
TimeFilter: getTimeFilter(t.TimeSlice),
})...)
}

if len(nodes) > 1 && t.TimePoint {
return []*Node{nodes[len(nodes)-1]}
}

return nodes
}

func (b *ElasticSearchBackend) indexEdge(e *Edge) error {
raw, err := edgeToRaw(e)
if err != nil {
Expand Down Expand Up @@ -708,3 +746,14 @@ func batchNodes(items []*Node, batchSize int) [][]*Node {
return batches

}

func batchIdentifiers(items []Identifier, batchSize int) [][]Identifier {
batches := make([][]Identifier, 0, (len(items)+batchSize-1)/batchSize)

for batchSize < len(items) {
items, batches = items[batchSize:], append(batches, items[0:batchSize:batchSize])
}
batches = append(batches, items)

return batches
}
9 changes: 9 additions & 0 deletions graffiti/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type Backend interface {
NodeAdded(n *Node) error
NodeDeleted(n *Node) error
GetNode(i Identifier, at Context) []*Node
GetNodesFromIDs(i []Identifier, at Context) []*Node
GetNodeEdges(n *Node, at Context, m ElementMatcher) []*Edge
GetNodesEdges(n []*Node, at Context, m ElementMatcher) []*Edge

Expand Down Expand Up @@ -1186,6 +1187,14 @@ func (g *Graph) GetNode(i Identifier) *Node {
return nil
}

// GetNodesFromIDs returns a list of nodes from a list of identifiers
func (g *Graph) GetNodesFromIDs(i []Identifier) []*Node {
if len(i) == 0 {
return []*Node{}
}
return g.backend.GetNodesFromIDs(i, g.context)
}

// CreateNode returns a new node not bound to a graph
func CreateNode(i Identifier, m Metadata, t Time, h string, o string) *Node {
n := &Node{
Expand Down
11 changes: 11 additions & 0 deletions graffiti/graph/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ func (m *MemoryBackend) GetNode(i Identifier, t Context) []*Node {
return nil
}

// GetNodesFromIDs from the graph backend
func (m *MemoryBackend) GetNodesFromIDs(identifiersList []Identifier, t Context) []*Node {
nodes := []*Node{}
for _, i := range identifiersList {
if n, ok := m.nodes[i]; ok {
nodes = append(nodes, n.Node)
}
}
return nodes
}

// GetNodeEdges returns a list of edges of a node
func (m *MemoryBackend) GetNodeEdges(n *Node, t Context, meta ElementMatcher) []*Edge {
edges := []*Edge{}
Expand Down
17 changes: 17 additions & 0 deletions graffiti/graph/orientdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,23 @@ func (o *OrientDBBackend) GetNode(i Identifier, t Context) (nodes []*Node) {
return o.searchNodes(t, query)
}

func (o *OrientDBBackend) GetNodesFromIDs(identifiersList []Identifier, t Context) (nodes []*Node) {
query := orientdb.FilterToExpression(getTimeFilter(t.TimeSlice), nil)
query += fmt.Sprintf(" AND (")
for i, id := range identifiersList {
if i == len(identifiersList)-1 {
query += fmt.Sprintf(" ID = '%s') ORDER BY Revision", id)
} else {
query += fmt.Sprintf(" ID = '%s' OR", id)
}
}

if t.TimePoint {
query += " DESC LIMIT 1"
}
return o.searchNodes(t, query)
}

// GetNodeEdges returns a list of a node edges within time slice
func (o *OrientDBBackend) GetNodeEdges(n *Node, t Context, m ElementMatcher) (edges []*Edge) {
query := orientdb.FilterToExpression(getTimeFilter(t.TimeSlice), nil)
Expand Down
188 changes: 188 additions & 0 deletions gremlin/traversal/neighbors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package traversal

import (
"github.com/pkg/errors"

"github.com/skydive-project/skydive/graffiti/filters"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/graffiti/graph/traversal"
"github.com/skydive-project/skydive/topology"
)

// NeighborsTraversalExtension describes a new extension to enhance the topology
type NeighborsTraversalExtension struct {
NeighborsToken traversal.Token
}

// NeighborsGremlinTraversalStep navigate the graph starting from a node, following edges
// from parent to child and from child to parent.
// It follows the same sintaxis as Ascendants and Descendants step.
// The behaviour is like Ascendants+Descendants combined.
// If only one param is defined, it is used as depth, eg: G.V('id').Neighbors(4)
// If we have an event number of parameters, they are used as edge filter, and
// depth is defaulted to one, eg.: G.V('id').Neighbors("Type","foo","RelationType","bar")
// If we have an odd, but >1, number of parameters, all but the last one are used as
// edge filters and the last one as depth, eg.: G.V('id').Neighbors("Type","foo","RelationType","bar",3)
type NeighborsGremlinTraversalStep struct {
context traversal.GremlinTraversalContext
maxDepth int64
edgeFilter graph.ElementMatcher
// nextStepOnlyIDs is set to true if the next step only needs node IDs and not the whole node info
nextStepOnlyIDs bool
}

// NewNeighborsTraversalExtension returns a new graph traversal extension
func NewNeighborsTraversalExtension() *NeighborsTraversalExtension {
return &NeighborsTraversalExtension{
NeighborsToken: traversalNeighborsToken,
}
}

// ScanIdent returns an associated graph token
func (e *NeighborsTraversalExtension) ScanIdent(s string) (traversal.Token, bool) {
switch s {
case "NEIGHBORS":
return e.NeighborsToken, true
}
return traversal.IDENT, false
}

// ParseStep parses neighbors step
func (e *NeighborsTraversalExtension) ParseStep(t traversal.Token, p traversal.GremlinTraversalContext) (traversal.GremlinTraversalStep, error) {
switch t {
case e.NeighborsToken:
default:
return nil, nil
}

maxDepth := int64(1)
edgeFilter, _ := topology.OwnershipMetadata().Filter()

switch len(p.Params) {
case 0:
default:
i := len(p.Params) / 2 * 2
filter, err := traversal.ParamsToFilter(filters.BoolFilterOp_OR, p.Params[:i]...)
if err != nil {
return nil, errors.Wrap(err, "Neighbors accepts an optional number of key/value tuples and an optional depth")
}
edgeFilter = filter

if i == len(p.Params) {
break
}

fallthrough
case 1:
depth, ok := p.Params[len(p.Params)-1].(int64)
if !ok {
return nil, errors.New("Neighbors last argument must be the maximum depth specified as an integer")
}
maxDepth = depth
}

return &NeighborsGremlinTraversalStep{context: p, maxDepth: maxDepth, edgeFilter: graph.NewElementFilter(edgeFilter)}, nil
}

// getNeighbors given a list of nodes, get its neighbors nodes for "maxDepth" depth relationships.
// Edges between nodes must fulfill "edgeFilter" filter.
// Nodes passed to this function will always be in the response.
func (d *NeighborsGremlinTraversalStep) getNeighbors(g *graph.Graph, nodes []*graph.Node) []*graph.Node {
// visitedNodes store neighors and avoid visiting twice the same node
visitedNodes := map[graph.Identifier]interface{}{}

// currentDepthNodesIDs slice with the nodes being processed in each depth.
// We use "empty" while procesing the neighbors nodes to avoid extra calls to the backend.
var currentDepthNodesIDs []graph.Identifier
// nextDepthNodes slice were next depth nodes are being stored.
// Initializated with the list of origin nodes where it should start from.
nextDepthNodesIDs := make([]graph.Identifier, 0, len(nodes))

// Mark origin nodes as already visited
// Neighbor step will return also the origin nodes
for _, n := range nodes {
visitedNodes[n.ID] = struct{}{}
nextDepthNodesIDs = append(nextDepthNodesIDs, n.ID)
}

// DFS
// BFS must not be used because could lead to ignore some servers in this case:
// A -> B
// B -> C
// C -> D
// A -> C
// With depth=2, BFS will return A,B,C (C is visited in A->B->C, si ignored in A->C->D)
// DFS will return, the correct, A,B,C,D
for i := 0; i < int(d.maxDepth); i++ {
// Copy values from nextDepthNodes to currentDepthNodes
currentDepthNodesIDs = make([]graph.Identifier, len(nextDepthNodesIDs))
copy(currentDepthNodesIDs, nextDepthNodesIDs)

nextDepthNodesIDs = nextDepthNodesIDs[:0] // Clean slice, keeping capacity
// Get all edges for the list of nodes, filtered by edgeFilter
// Convert the list of node ids to a list of nodes

currentDepthNodes := make([]*graph.Node, 0, len(currentDepthNodesIDs))
for _, nID := range currentDepthNodesIDs {
currentDepthNodes = append(currentDepthNodes, graph.CreateNode(nID, graph.Metadata{}, graph.Unix(0, 0), "", ""))
}
edges := g.GetNodesEdges(currentDepthNodes, d.edgeFilter)

for _, e := range edges {
// Get nodeID of the other side of the edge
// Store neighbors
// We don't know in which side of the edge are the neighbors, so, add both sides if not already visited
_, okParent := visitedNodes[e.Parent]
if !okParent {
visitedNodes[e.Parent] = struct{}{}
// Do not walk nodes already processed
nextDepthNodesIDs = append(nextDepthNodesIDs, e.Parent)
}
_, okChild := visitedNodes[e.Child]
if !okChild {
visitedNodes[e.Child] = struct{}{}
nextDepthNodesIDs = append(nextDepthNodesIDs, e.Child)
}
}
}

// Return "empty" nodes (just with the ID) if the next step only need that info
if d.nextStepOnlyIDs {
ret := make([]*graph.Node, 0, len(visitedNodes))
for nID := range visitedNodes {
ret = append(ret, graph.CreateNode(nID, graph.Metadata{}, graph.Unix(0, 0), "", ""))
}
return ret
}

// Get concurrentl all nodes for the list of neighbors ids
nodesIDs := make([]graph.Identifier, 0, len(visitedNodes))
for n := range visitedNodes {
nodesIDs = append(nodesIDs, n)
}

return g.GetNodesFromIDs(nodesIDs)
}

// Exec Neighbors step
func (d *NeighborsGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) (traversal.GraphTraversalStep, error) {
switch tv := last.(type) {
case *traversal.GraphTraversalV:
tv.GraphTraversal.RLock()
neighbors := d.getNeighbors(tv.GraphTraversal.Graph, tv.GetNodes())
tv.GraphTraversal.RUnlock()

return traversal.NewGraphTraversalV(tv.GraphTraversal, neighbors), nil
}
return nil, traversal.ErrExecutionError
}

// Reduce Neighbors step
func (d *NeighborsGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) (traversal.GremlinTraversalStep, error) {
return next, nil
}

// Context Neighbors step
func (d *NeighborsGremlinTraversalStep) Context() *traversal.GremlinTraversalContext {
return &d.context
}
Loading

0 comments on commit d66e0ea

Please sign in to comment.