Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow: Nto1CheckConnectivity #2014

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/server/packet_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (pi *PacketInjectorAPI) validateRequest(ppr *types.PacketInjection) error {
}

ipField := "IPV4"
if ppr.Type == "icmp6" || ppr.Type == "tcp6" || ppr.Type == "udp6" {
if ppr.Type == types.PiTypeICMP6 || ppr.Type == types.PiTypeTCP6 || ppr.Type == types.PiTypeUDP6 {
ipField = "IPV6"
}

Expand Down
31 changes: 29 additions & 2 deletions api/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ func (n *NodeRule) Validate() error {
return nil
}

const (
// PIModeUniqPerNode use a unique packet identifier per source node
PIModeUniqPerNode = iota
// PIModeRandom use random packet identifier for each packet
PIModeRandom

// PiTypeICMP4 icmpv4 packet
PiTypeICMP4 = "icmp4"
// PiTypeICMP6 icmpv6 packet
PiTypeICMP6 = "icmp6"
// PiTypeTCP4 ipv4 + tcp packet
PiTypeTCP4 = "tcp4"
// PiTypeTCP6 ipv6 + tcp packet
PiTypeTCP6 = "tcp6"
// PiTypeUDP4 ipv4 + udp packet
PiTypeUDP4 = "udp4"
// PiTypeUDP6 ipv6 + udp packet
PiTypeUDP6 = "udp6"
)

// PacketInjection packet injector API parameters
// easyjson:json
// swagger:model
Expand All @@ -256,7 +276,7 @@ type PacketInjection struct {
ICMPID uint16 `yaml:"ICMPID"`
Count uint64 `yaml:"Count"`
Interval uint64 `yaml:"Interval"`
Increment bool `yaml:"Increment"`
Mode int `yaml:"Mode"`
IncrementPayload int64 `yaml:"IncrementPayload"`
StartTime time.Time
Pcap []byte `yaml:"Pcap"`
Expand All @@ -270,7 +290,14 @@ func (pi *PacketInjection) GetName() string {

// Validate verifies the packet injection type is supported
func (pi *PacketInjection) Validate() error {
allowedTypes := map[string]bool{"icmp4": true, "icmp6": true, "tcp4": true, "tcp6": true, "udp4": true, "udp6": true}
allowedTypes := map[string]bool{
PiTypeICMP4: true,
PiTypeICMP6: true,
PiTypeTCP4: true,
PiTypeTCP6: true,
PiTypeUDP4: true,
PiTypeUDP6: true,
}
if _, ok := allowedTypes[pi.Type]; !ok {
return errors.New("given type is not supported")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/client/packet_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var PacketInjectionCreate = &cobra.Command{
ICMPID: request.ICMPID,
Count: request.Count,
Interval: request.Interval,
Increment: request.Increment,
Mode: request.Mode,
IncrementPayload: request.IncrementPayload,
TTL: request.TTL,
}
Expand Down
21 changes: 11 additions & 10 deletions cmd/injector/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os"
"time"

"github.com/skydive-project/skydive/api/types"
"github.com/skydive-project/skydive/common"
"github.com/skydive-project/skydive/packetinjector"
pi "github.com/skydive-project/skydive/packetinjector"
Expand All @@ -46,25 +47,25 @@ var (
id uint16
count uint64
interval uint64
increment bool
mode int
incrementPayload int64
ttl uint8
)

// AddInjectPacketInjectFlags add the command line flags for a packet injection
func AddInjectPacketInjectFlags(cmd *cobra.Command) {
cmd.Flags().StringVarP(&srcIP, "srcIP", "", "", "source node IP")
cmd.Flags().StringVarP(&dstIP, "dstIP", "", "", "destination node IP")
cmd.Flags().StringVarP(&srcMAC, "srcMAC", "", "", "source node MAC")
cmd.Flags().StringVarP(&dstMAC, "dstMAC", "", "", "destination node MAC")
cmd.Flags().Uint16VarP(&srcPort, "srcPort", "", 0, "source port for TCP packet")
cmd.Flags().Uint16VarP(&dstPort, "dstPort", "", 0, "destination port for TCP packet")
cmd.Flags().StringVarP(&srcIP, "src-ip", "", "", "source node IP")
cmd.Flags().StringVarP(&dstIP, "dst-ip", "", "", "destination node IP")
cmd.Flags().StringVarP(&srcMAC, "src-mac", "", "", "source node MAC")
cmd.Flags().StringVarP(&dstMAC, "dst-mac", "", "", "destination node MAC")
cmd.Flags().Uint16VarP(&srcPort, "src-port", "", 0, "source port for TCP packet")
cmd.Flags().Uint16VarP(&dstPort, "dst-port", "", 0, "destination port for TCP packet")
cmd.Flags().StringVarP(&packetType, "type", "", "icmp4", "packet type: icmp4, icmp6, tcp4, tcp6, udp4 and udp6")
cmd.Flags().StringVarP(&payload, "payload", "", "", "payload")
cmd.Flags().StringVar(&pcap, "pcap", "", "PCAP file")
cmd.Flags().Uint16VarP(&id, "id", "", 0, "ICMP identification")
cmd.Flags().BoolVarP(&increment, "increment", "", false, "increment ICMP id for each packet")
cmd.Flags().Int64VarP(&incrementPayload, "incrementPayload", "", 0, "increase payload for each packet")
cmd.Flags().IntVarP(&mode, "mode", "", types.PIModeRandom, "specify type of modification between injections")
cmd.Flags().Int64VarP(&incrementPayload, "inc-payload", "", 0, "increase payload each packet")
cmd.Flags().Uint64VarP(&count, "count", "", 1, "number of packets to be generated")
cmd.Flags().Uint64VarP(&interval, "interval", "", 0, "wait interval milliseconds between sending each packet")
cmd.Flags().Uint8VarP(&ttl, "ttl", "", 64, "IP time-to-live header")
Expand Down Expand Up @@ -110,7 +111,7 @@ func GetPacketInjectRequest() (*pi.PacketInjectionRequest, error) {
Count: count,
ICMPID: id,
Interval: interval,
Increment: increment,
Mode: mode,
IncrementPayload: incrementPayload,
Payload: payload,
Pcap: pcapContent,
Expand Down
22 changes: 20 additions & 2 deletions flow/ondemand/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,33 @@ func (h *onDemandFlowHandler) ResourceName() string {
return "Capture"
}

func (h *onDemandFlowHandler) GetNodes(resource types.Resource) []interface{} {
func (h *onDemandFlowHandler) GetNodeResources(resource types.Resource) []client.OnDemandNodeResource {
var nrs []client.OnDemandNodeResource

capture := resource.(*types.Capture)

query := capture.GremlinQuery
query += fmt.Sprintf(".Dedup().Has('Captures.ID', NEE('%s'))", resource.ID())
if capture.Type != "" && !common.CheckProbeCapabilities(capture.Type, common.MultipleOnSameNodeCapability) {
query += fmt.Sprintf(".Has('Captures.Type', NEE('%s'))", capture.Type)
}
query += h.nodeTypeQuery
return h.applyGremlinExpr(query)

if nodes := h.applyGremlinExpr(query); len(nodes) > 0 {
for _, i := range nodes {
switch i.(type) {
case *graph.Node:
nrs = append(nrs, client.OnDemandNodeResource{Node: i.(*graph.Node), Resource: capture})
case []*graph.Node:
// case of shortestpath that returns a list of nodes
for _, node := range i.([]*graph.Node) {
nrs = append(nrs, client.OnDemandNodeResource{Node: node, Resource: capture})
}
}
}
}

return nrs
}

func (h *onDemandFlowHandler) applyGremlinExpr(query string) []interface{} {
Expand Down
4 changes: 2 additions & 2 deletions js/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ export function REGEX(param: any): Predicate {
return new Predicate("REGEX", param)
}

export function WITHIN(...params: any[]): Predicate {
return new Predicate("WITHIN", ...params)
export function Within(...params: any[]): Predicate {
return new Predicate("Within", ...params)
}

export function WITHOUT(...params: any[]): Predicate {
Expand Down
2 changes: 1 addition & 1 deletion js/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ window.GTE = apiLib.GTE
window.LTE = apiLib.LTE
window.IPV4RANGE = apiLib.IPV4RANGE
window.REGEX = apiLib.REGEX
window.WITHIN = apiLib.WITHIN
window.Within = apiLib.Within
window.WITHOUT = apiLib.WITHOUT
window.INSIDE = apiLib.INSIDE
window.OUTSIDE = apiLib.OUTSIDE
Expand Down
36 changes: 22 additions & 14 deletions ondemand/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ import (
ws "github.com/skydive-project/skydive/websocket"
)

type OnDemandNodeResource struct {
Node *graph.Node
Resource types.Resource
}

// OnDemandClientHandler is the interface to be implemented by ondemand clients
type OnDemandClientHandler interface {
ResourceName() string
GetNodes(resource types.Resource) []interface{}
GetNodeResources(resource types.Resource) []OnDemandNodeResource
CheckState(n *graph.Node, resource types.Resource) bool
DecodeMessage(msg json.RawMessage) (types.Resource, error)
EncodeMessage(nodeID graph.Identifier, resource types.Resource) (json.RawMessage, error)
Expand Down Expand Up @@ -180,25 +185,28 @@ func (o *OnDemandClient) unregisterTask(node *graph.Node, resource types.Resourc
return true
}

func (o *OnDemandClient) nodeTasks(nodes []interface{}, resource types.Resource) map[graph.Identifier]nodeTask {
toRegister := func(node *graph.Node) (nodeID graph.Identifier, host string, register bool) {
func (o *OnDemandClient) nodeTasks(nrs []OnDemandNodeResource) map[graph.Identifier]nodeTask {
toRegister := func(nr OnDemandNodeResource) (nodeID graph.Identifier, host string, register bool) {
// check not already registered
tasks, ok := o.registeredNodes[node.ID]
tasks, ok := o.registeredNodes[nr.Node.ID]
if ok {
ok = tasks[resource.ID()]
ok = tasks[nr.Resource.ID()]
}

if ok {
logging.GetLogger().Debugf("%s already registered on %s", resource.ID(), node.ID)
logging.GetLogger().Debugf("%s already registered on %s", nr.Resource.ID(), nr.Node.ID)
return
}

return node.ID, node.Host, true
return nr.Node.ID, nr.Node.Host, true
}

nps := map[graph.Identifier]nodeTask{}
for _, i := range nodes {
switch i.(type) {
for _, nr := range nrs {
if nodeID, host, ok := toRegister(nr); ok {
nps[nodeID] = nodeTask{nodeID, host, nr.Resource}
}
/*switch i.(type) {
case *graph.Node:
node := i.(*graph.Node)
if nodeID, host, ok := toRegister(node); ok {
Expand All @@ -211,7 +219,7 @@ func (o *OnDemandClient) nodeTasks(nodes []interface{}, resource types.Resource)
nps[nodeID] = nodeTask{nodeID, host, resource}
}
}
}
}*/
}

return nps
Expand All @@ -231,8 +239,8 @@ func (o *OnDemandClient) checkForRegistrationCallback() {
defer o.RUnlock()

for _, resource := range o.resources {
if nodes := o.handler.GetNodes(resource); len(nodes) > 0 {
if nps := o.nodeTasks(nodes, resource); len(nps) > 0 {
if nrs := o.handler.GetNodeResources(resource); len(nrs) > 0 {
if nps := o.nodeTasks(nrs); len(nps) > 0 {
go o.registerTasks(nps)
}
}
Expand Down Expand Up @@ -312,8 +320,8 @@ func (o *OnDemandClient) registerResource(resource types.Resource) {

o.resources[resource.ID()] = resource

if nodes := o.handler.GetNodes(resource); len(nodes) > 0 {
if nps := o.nodeTasks(nodes, resource); len(nps) > 0 {
if nrs := o.handler.GetNodeResources(resource); len(nrs) > 0 {
if nps := o.nodeTasks(nrs); len(nps) > 0 {
go o.registerTasks(nps)
}
}
Expand Down
47 changes: 43 additions & 4 deletions packetinjector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (h *onDemandPacketInjectionHandler) createRequest(nodeID graph.Identifier,
Count: pi.Count,
Interval: pi.Interval,
ICMPID: pi.ICMPID,
Increment: pi.Increment,
Mode: pi.Mode,
IncrementPayload: pi.IncrementPayload,
TTL: pi.TTL,
}
Expand Down Expand Up @@ -228,10 +228,49 @@ func (h *onDemandPacketInjectionHandler) ResourceName() string {
return "PacketInjection"
}

func (h *onDemandPacketInjectionHandler) GetNodes(resource types.Resource) []interface{} {
query := resource.(*types.PacketInjection).Src
func (h *onDemandPacketInjectionHandler) GetNodeResources(resource types.Resource) []client.OnDemandNodeResource {
var nrs []client.OnDemandNodeResource

pi := resource.(*types.PacketInjection)

query := pi.Src
query += fmt.Sprintf(".Dedup('TID').Has('PacketInjections.ID', NEE('%s'))", resource.ID())
return h.applyGremlinExpr(query)

if nodes := h.applyGremlinExpr(query); len(nodes) > 0 {
id := pi.ICMPID
srcPort := pi.SrcPort

addNrs := func(n *graph.Node) {
r := *pi
r.ICMPID = id
r.SrcPort = srcPort

nrs = append(nrs, client.OnDemandNodeResource{Node: n, Resource: &r})

if r.Mode == types.PIModeUniqPerNode {
switch r.Type {
case types.PiTypeICMP4, types.PiTypeICMP6:
id++
default:
srcPort++
}
}
}

for _, i := range nodes {
switch i.(type) {
case *graph.Node:
addNrs(i.(*graph.Node))
case []*graph.Node:
// case of shortestpath that returns a list of nodes
for _, node := range i.([]*graph.Node) {
addNrs(node)
}
}
}
}

return nrs
}

func (h *onDemandPacketInjectionHandler) applyGremlinExpr(query string) []interface{} {
Expand Down
Loading