Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #5689: Use Traceflow API v1beta1 #5713

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 39 additions & 36 deletions pkg/antctl/raw/traceflow/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/antctl/raw"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/apis/crd/v1beta1"
antrea "antrea.io/antrea/pkg/client/clientset/versioned"
)

Expand Down Expand Up @@ -64,23 +64,23 @@ var protocols = map[string]int32{
}

type CapturedPacket struct {
SrcIP string `json:"srcIP" yaml:"srcIP"`
DstIP string `json:"dstIP" yaml:"dstIP"`
Length uint16 `json:"length" yaml:"length"`
IPHeader *v1alpha1.IPHeader `json:"ipHeader,omitempty" yaml:"ipHeader,omitempty"`
IPv6Header *v1alpha1.IPv6Header `json:"ipv6Header,omitempty" yaml:"ipv6Header,omitempty"`
TransportHeader *v1alpha1.TransportHeader `json:"transportHeader,omitempty" yaml:"tranportHeader,omitempty"`
SrcIP string `json:"srcIP" yaml:"srcIP"`
DstIP string `json:"dstIP" yaml:"dstIP"`
Length int32 `json:"length" yaml:"length"`
IPHeader *v1beta1.IPHeader `json:"ipHeader,omitempty" yaml:"ipHeader,omitempty"`
IPv6Header *v1beta1.IPv6Header `json:"ipv6Header,omitempty" yaml:"ipv6Header,omitempty"`
TransportHeader *v1beta1.TransportHeader `json:"transportHeader,omitempty" yaml:"tranportHeader,omitempty"`
}

// Response is the response of antctl Traceflow.
type Response struct {
Name string `json:"name" yaml:"name"` // Traceflow name
Phase v1alpha1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase
Reason string `json:"reason,omitempty" yaml:"reason,omitempty"` // Traceflow phase reason
Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0"
Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1"
NodeResults []v1alpha1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results
CapturedPacket *CapturedPacket `json:"capturedPacket,omitempty" yaml:"capturedPacket,omitempty"` // Captured packet in live-traffic Traceflow
Name string `json:"name" yaml:"name"` // Traceflow name
Phase v1beta1.TraceflowPhase `json:"phase,omitempty" yaml:"phase,omitempty"` // Traceflow phase
Reason string `json:"reason,omitempty" yaml:"reason,omitempty"` // Traceflow phase reason
Source string `json:"source,omitempty" yaml:"source,omitempty"` // Traceflow source, e.g. "default/pod0"
Destination string `json:"destination,omitempty" yaml:"destination,omitempty"` // Traceflow destination, e.g. "default/pod1"
NodeResults []v1beta1.NodeResult `json:"results,omitempty" yaml:"results,omitempty"` // Traceflow node results
CapturedPacket *CapturedPacket `json:"capturedPacket,omitempty" yaml:"capturedPacket,omitempty"` // Captured packet in live-traffic Traceflow
}

func init() {
Expand Down Expand Up @@ -168,12 +168,12 @@ func runE(cmd *cobra.Command, _ []string) error {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err = client.CrdV1alpha1().Traceflows().Create(ctx, tf, metav1.CreateOptions{}); err != nil {
if _, err = client.CrdV1beta1().Traceflows().Create(ctx, tf, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("error when creating Traceflow, is Traceflow feature gate enabled? %w", err)
}
defer func() {
if !option.nowait {
if err = client.CrdV1alpha1().Traceflows().Delete(context.TODO(), tf.Name, metav1.DeleteOptions{}); err != nil {
if err = client.CrdV1beta1().Traceflows().Delete(context.TODO(), tf.Name, metav1.DeleteOptions{}); err != nil {
klog.Errorf("error when deleting Traceflow: %+v", err)
}
}
Expand All @@ -183,13 +183,13 @@ func runE(cmd *cobra.Command, _ []string) error {
return nil
}

var res *v1alpha1.Traceflow
var res *v1beta1.Traceflow
err = wait.Poll(1*time.Second, option.timeout, func() (bool, error) {
res, err = client.CrdV1alpha1().Traceflows().Get(context.TODO(), tf.Name, metav1.GetOptions{})
res, err = client.CrdV1beta1().Traceflows().Get(context.TODO(), tf.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if res.Status.Phase != v1alpha1.Succeeded && res.Status.Phase != v1alpha1.Failed {
if res.Status.Phase != v1beta1.Succeeded && res.Status.Phase != v1beta1.Failed {
return false, nil
}
return true, nil
Expand All @@ -210,9 +210,9 @@ func runE(cmd *cobra.Command, _ []string) error {
return err
}

func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
func newTraceflow(client kubernetes.Interface) (*v1beta1.Traceflow, error) {
var srcName, dstName string
var src v1alpha1.Source
var src v1beta1.Source

if option.source != "" {
srcIP := net.ParseIP(option.source)
Expand Down Expand Up @@ -240,7 +240,7 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
srcName = "any"
}

var dst v1alpha1.Destination
var dst v1beta1.Destination
if option.destination != "" {
dstIP := net.ParseIP(option.destination)
if dstIP != nil {
Expand Down Expand Up @@ -285,17 +285,17 @@ func newTraceflow(client kubernetes.Interface) (*v1alpha1.Traceflow, error) {
}

name := getTFName(fmt.Sprintf("%s-to-%s", srcName, dstName))
tf := &v1alpha1.Traceflow{
tf := &v1beta1.Traceflow{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1alpha1.TraceflowSpec{
Spec: v1beta1.TraceflowSpec{
Source: src,
Destination: dst,
Packet: *pkt,
LiveTraffic: option.liveTraffic,
DroppedOnly: option.droppedOnly,
Timeout: uint16(option.timeout.Seconds()),
Timeout: int32(option.timeout.Seconds()),
},
}
return tf, nil
Expand All @@ -314,18 +314,20 @@ func dstIsPod(client kubernetes.Interface, ns string, name string) (bool, error)
return true, nil
}

func parseFlow() (*v1alpha1.Packet, error) {
func parseFlow() (*v1beta1.Packet, error) {
cleanFlow := strings.ReplaceAll(option.flow, " ", "")
fields, err := getPortFields(cleanFlow)
if err != nil {
return nil, fmt.Errorf("error when parsing the flow: %w", err)
}

var pkt v1alpha1.Packet
var pkt v1beta1.Packet

_, isIPv6 := fields["ipv6"]
if isIPv6 {
pkt.IPv6Header = new(v1alpha1.IPv6Header)
pkt.IPv6Header = new(v1beta1.IPv6Header)
} else {
pkt.IPHeader = new(v1beta1.IPHeader)
}
for k, v := range protocols {
if _, ok := fields[k]; ok {
Expand All @@ -340,28 +342,29 @@ func parseFlow() (*v1alpha1.Packet, error) {
}

if r, ok := fields["tcp_src"]; ok {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
pkt.TransportHeader.TCP.SrcPort = int32(r)
}
if r, ok := fields["tcp_dst"]; ok {
if pkt.TransportHeader.TCP == nil {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
}
pkt.TransportHeader.TCP.DstPort = int32(r)
}
if r, ok := fields["tcp_flags"]; ok {
if pkt.TransportHeader.TCP == nil {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP = new(v1beta1.TCPHeader)
}
pkt.TransportHeader.TCP.Flags = int32(r)
tcpFlags := int32(r)
pkt.TransportHeader.TCP.Flags = &tcpFlags
}
if r, ok := fields["udp_src"]; ok {
pkt.TransportHeader.UDP = new(v1alpha1.UDPHeader)
pkt.TransportHeader.UDP = new(v1beta1.UDPHeader)
pkt.TransportHeader.UDP.SrcPort = int32(r)
}
if r, ok := fields["udp_dst"]; ok {
if pkt.TransportHeader.UDP == nil {
pkt.TransportHeader.UDP = new(v1alpha1.UDPHeader)
pkt.TransportHeader.UDP = new(v1beta1.UDPHeader)
}
pkt.TransportHeader.UDP.DstPort = int32(r)
}
Expand Down Expand Up @@ -390,7 +393,7 @@ func getPortFields(cleanFlow string) (map[string]int, error) {
return fields, nil
}

func output(tf *v1alpha1.Traceflow, writer io.Writer) error {
func output(tf *v1beta1.Traceflow, writer io.Writer) error {
r := Response{
Name: tf.Name,
Phase: tf.Status.Phase,
Expand All @@ -410,7 +413,7 @@ func output(tf *v1alpha1.Traceflow, writer io.Writer) error {
if pkt != nil {
r.CapturedPacket = &CapturedPacket{SrcIP: pkt.SrcIP, DstIP: pkt.DstIP, Length: pkt.Length, IPv6Header: pkt.IPv6Header}
if pkt.IPv6Header == nil {
r.CapturedPacket.IPHeader = &pkt.IPHeader
r.CapturedPacket.IPHeader = pkt.IPHeader
}
if pkt.TransportHeader.TCP != nil || pkt.TransportHeader.UDP != nil || pkt.TransportHeader.ICMP != nil {
r.CapturedPacket.TransportHeader = &pkt.TransportHeader
Expand Down
Loading