Skip to content

Commit

Permalink
Merge step
Browse files Browse the repository at this point in the history
Aggregates elements from different revisions of the nodes into a new
metadata key.
Given a metadata element, that should be a map[string]interface{},
aggregate different values into another metadata key with format
map[string][]interface{}
Eg.:
  Metadata.data V1: {"a":{x}, "b":{y}}
  Metadata.data V2: {"a":{z}, "b":{y}}
  Metadata.agg:     {"a":[{x},{z}], "b":[{y}]}

It's purpose its to show data from past revisions of the same node.
Example:
G.At(1479899809,3600).V().Merge('data','agg')

It could be also called defining the time slice in the parameters
(since, from):
G.V().Merge('A','B',1500000000,1500099999)

This step return a modified copy of the last node, with all the
aggregated data, not the node stored in the graph.
This is to avoid modiying the node stored in the graph.

This PR also modifies the Reduce method of the Neighbors step.
Merge step only needs the node IDs, so Neighbors step could skip
retrieving the full content of nodes.
  • Loading branch information
adrianlzt committed Aug 20, 2021
1 parent 446f6a1 commit 133a80b
Show file tree
Hide file tree
Showing 9 changed files with 879 additions and 3 deletions.
1 change: 1 addition & 0 deletions analyzer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func NewServerFromConfig() (*Server, error) {
tr.AddTraversalExtension(ge.NewNeighborsTraversalExtension())
tr.AddTraversalExtension(ge.NewNextHopTraversalExtension())
tr.AddTraversalExtension(ge.NewGroupTraversalExtension())
tr.AddTraversalExtension(ge.NewMergeTraversalExtension())

// new flow subscriber endpoints
flowSubscriberWSServer := ws.NewStructServer(config.NewWSServer(hub.HTTPServer(), "/ws/subscriber/flow", apiAuthBackend))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.0
github.com/stretchr/testify v1.6.1 // indirect
github.com/stretchr/testify v1.7.0
github.com/t-yuki/gocover-cobertura v0.0.0-20180217150009-aaee18c8195c
github.com/tebeka/go2xunit v1.4.10
github.com/tebeka/selenium v0.0.0-20170314201507-657e45ec600f
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20160928074757-e7cb7fa329f4/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down
2 changes: 1 addition & 1 deletion graffiti/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/skydive-project/go-debouncer v1.0.0
github.com/spf13/cast v1.3.1
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.7.0
github.com/tchap/zapext v1.0.0
github.com/xeipuuv/gojsonschema v1.2.0
go.uber.org/zap v1.16.0
Expand Down
258 changes: 258 additions & 0 deletions gremlin/traversal/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package traversal

import (
"fmt"
"reflect"
"time"

"github.com/pkg/errors"
"github.com/skydive-project/skydive/graffiti/graph"
"github.com/skydive-project/skydive/graffiti/graph/traversal"
"github.com/skydive-project/skydive/graffiti/logging"
)

// MergeTraversalExtension describes a new extension to enhance the topology
type MergeTraversalExtension struct {
MergeToken traversal.Token
}

// MergeGremlinTraversalStep step aggregates elements from different revisions of the nodes into a new metadata key.
// Nodes returned by this step are copies of the nodes in the graph, not the actual nodes.
// The reason is because this step is not meant to modify nodes in the graph, just for the output.
// This step should be used with a presistant backend, so it can access previous revisions of the nodes.
// To use this step we should select a metadata key (first parameter), where the elements will be read from.
// Inside this Metadata.Key elements should have the format map[interface{}]interface{} (could be a type based on that).
// The second parameter is the metadata key where all the elements will be aggregated.
// The aggregation will with the format: map[string][]interface{}.
// All elements with the same key in the map will be joined in an slice.
// To use this step we can use a graph with a time period context, eg: G.At(1479899809,3600).V().Merge('A','B').
// Or we can define the time period in the step: G.V().Merge('A','B',1500000000,1500099999).
// Note that in this case we define the start and end time, while in "At" is start time and duration.
// In both cases, Merge step will use the nodes given by the previous step.
type MergeGremlinTraversalStep struct {
traversal.GremlinTraversalContext
MergeKey string
MergeAggKey string
StartTime time.Time
EndTime time.Time
}

// NewMergeTraversalExtension returns a new graph traversal extension
func NewMergeTraversalExtension() *MergeTraversalExtension {
return &MergeTraversalExtension{
MergeToken: traversalMergeToken,
}
}

// ScanIdent recognise the word associated with this step (in uppercase) and return a token
// which represents it. Return true if it have found a match
func (e *MergeTraversalExtension) ScanIdent(s string) (traversal.Token, bool) {
switch s {
case "MERGE":
return e.MergeToken, true
}
return traversal.IDENT, false
}

// ParseStep generate a step for a given token, having in 'p' context and params
func (e *MergeTraversalExtension) ParseStep(t traversal.Token, p traversal.GremlinTraversalContext) (traversal.GremlinTraversalStep, error) {
switch t {
case e.MergeToken:
default:
return nil, nil
}

var mergeKey, mergeAggKey string
var startTime, endTime time.Time
var ok bool

switch len(p.Params) {
case 2:
mergeKey, ok = p.Params[0].(string)
if !ok {
return nil, errors.New("Merge first parameter have to be a string")
}
mergeAggKey, ok = p.Params[1].(string)
if !ok {
return nil, errors.New("Merge second parameter have to be a string")
}
case 4:
mergeKey, ok = p.Params[0].(string)
if !ok {
return nil, errors.New("Merge first parameter have to be a string")
}
mergeAggKey, ok = p.Params[1].(string)
if !ok {
return nil, errors.New("Merge second parameter have to be a string")
}
startTimeUnixEpoch, ok := p.Params[2].(int64)
if !ok {
return nil, errors.New("Merge third parameter have to be a int (unix epoch time)")
}
startTime = time.Unix(startTimeUnixEpoch, 0)
endTimeUnixEpoch, ok := p.Params[3].(int64)
if !ok {
return nil, errors.New("Merge fourth parameter have to be a int (unix epoch time)")
}
endTime = time.Unix(endTimeUnixEpoch, 0)
default:
return nil, errors.New("Merge parameter must have two or four parameters (OriginKey, DestinationKey, StartTime, EndTime)")
}

return &MergeGremlinTraversalStep{
GremlinTraversalContext: p,
MergeKey: mergeKey,
MergeAggKey: mergeAggKey,
StartTime: startTime,
EndTime: endTime,
}, nil
}

// Exec executes the merge step
func (s *MergeGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) (traversal.GraphTraversalStep, error) {
switch tv := last.(type) {
case *traversal.GraphTraversalV:
return s.InterfaceMerge(tv)

}
return nil, traversal.ErrExecutionError
}

// Reduce merge step
func (s *MergeGremlinTraversalStep) Reduce(next traversal.GremlinTraversalStep) (traversal.GremlinTraversalStep, error) {
return next, nil
}

// Context merge step
func (s *MergeGremlinTraversalStep) Context() *traversal.GremlinTraversalContext {
return &s.GremlinTraversalContext
}

// InterfaceMerge for each node id, group all the elements stored in Metadata.key from the
// input nodes and put them into the newest node for each id into Metadata.aggKey.
// Merge are groupped based on its key. See mergedMetadata for an example.
// All output nodes will have Metadata.aggKey defined (empty or not).
func (s *MergeGremlinTraversalStep) InterfaceMerge(tv *traversal.GraphTraversalV) (traversal.GraphTraversalStep, error) {
// If user has defined start/end time in the parameters, use that values instead of the ones comming with the graph
if !s.StartTime.IsZero() && !s.EndTime.IsZero() {
timeSlice := graph.NewTimeSlice(
graph.Time(s.StartTime).UnixMilli(),
graph.Time(s.EndTime).UnixMilli(),
)
userTimeSliceCtx := graph.Context{
TimeSlice: timeSlice,
TimePoint: true,
}

newGraph, err := tv.GraphTraversal.Graph.CloneWithContext(userTimeSliceCtx)
if err != nil {
return nil, err
}
tv.GraphTraversal.Graph = newGraph
}

tv.GraphTraversal.RLock()
defer tv.GraphTraversal.RUnlock()

// uniqNodes store the latest node for each node identifier
uniqNodes := map[graph.Identifier]*graph.Node{}

// elements accumulate the elements for each node id
elements := map[graph.Identifier]map[string][]interface{}{}

// Get the list of node ids
nodesIDs := make([]graph.Identifier, 0, len(tv.GetNodes()))
for _, node := range tv.GetNodes() {
nodesIDs = append(nodesIDs, node.ID)
}

// Get all revision for the list of node ids
revisionNodes := tv.GraphTraversal.Graph.GetNodesFromIDs(nodesIDs)

// Store only the most recent nodes
for _, rNode := range revisionNodes {
storedNode, ok := uniqNodes[rNode.ID]
if !ok {
uniqNodes[rNode.ID] = rNode
} else {
if storedNode.Revision < rNode.Revision {
uniqNodes[rNode.ID] = rNode
}
}

// Store elements from all revisions into the "elements" variable
elements[rNode.ID] = mergeMetadata(rNode, s.MergeKey, elements[rNode.ID])
}

// Move the nodes from the uniqNodes map to an slice required by TraversalV
// Return a copy of the nodes, not the actual graph nodes, because we don't want
// to modify nodes with this step, just append some extra info
nodes := []*graph.Node{}
for id, n := range uniqNodes {
nCopy := n.Copy()

e, ok := elements[id]
if ok {
// Set the stored node with the merge of previous and current node
metadataSet := nCopy.Metadata.SetField(s.MergeAggKey, e)
if !metadataSet {
return nil, fmt.Errorf("unable to set elements metadata for copied node %v", id)
}
}

nodes = append(nodes, nCopy)
}

return traversal.NewGraphTraversalV(tv.GraphTraversal, nodes), nil
}

// mergeMetadata return the merge of node.Key elements with the ones already stored in nodeMerge
// Eg.:
// node: Metadata.key: {"a":{x}, "b":{y}}
// nodeMerge: {"a":[{z}]}
// return: Metadata.key: {"a":[{x},{z}], "b":[{y}]}
//
// Ignore if Metadata.key has an invalid format (not a map).
// Reflect is used to be able to access map's defined in different types.
// Element aggregation data type should be map[string]interface{} to be able to be encoded with JSON
func mergeMetadata(node *graph.Node, key string, nodeMerge map[string][]interface{}) map[string][]interface{} {
if nodeMerge == nil {
nodeMerge = map[string][]interface{}{}
}

n1MergeIface, n1Err := node.GetField(key)

if n1Err == nil {
// Ignore Metadata.key values which are not a map
n1MergeValue := reflect.ValueOf(n1MergeIface)

// If the metadata value is a pointer, resolve it
if n1MergeValue.Kind() == reflect.Ptr {
n1MergeValue = n1MergeValue.Elem()
}

// Merge step only accepts a map as data origin
if n1MergeValue.Kind() != reflect.Map {
logging.GetLogger().Errorf("Invalid type for elements, expecting a map, but it is %v", n1MergeValue.Kind())
return nodeMerge
}

iter := n1MergeValue.MapRange()
NODE_MERGE:
for iter.Next() {
k := fmt.Sprintf("%v", iter.Key().Interface())
v := iter.Value().Interface()

// Do not append if the same element already exists
for _, storedElement := range nodeMerge[k] {
if reflect.DeepEqual(storedElement, v) {
continue NODE_MERGE
}
}

nodeMerge[k] = append(nodeMerge[k], v)
}
}

return nodeMerge
}
Loading

0 comments on commit 133a80b

Please sign in to comment.