Skip to content

Commit

Permalink
Fix network policy issue in userspace datapath of Flow Exporter (#2194)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yongming Ding authored May 22, 2021
1 parent fa54190 commit 8c3adad
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 11 deletions.
3 changes: 3 additions & 0 deletions pkg/agent/flowexporter/connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (cs *connectionStore) addOrUpdateConn(conn *flowexporter.Connection) {
// IDs stored in the connection label.
if len(conn.Labels) != 0 {
klog.V(4).Infof("connection label: %x; label masks: %x", conn.Labels, conn.LabelsMask)
// We always expect labels from conntrack dumper to be added in little-endian format right now
// In kernel datapath, the labels uses the "native" endianness for the system, which are little-endian
// on most of the modern CPUs based on x86 architecture like Intel, AMD, etc.
ingressOfID := binary.LittleEndian.Uint32(conn.Labels[:4])
egressOfID := binary.LittleEndian.Uint32(conn.Labels[4:8])
if ingressOfID != 0 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/flowexporter/connections/conntrack_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type connTrackSystem struct {
connTrack NetFilterConnTrack
}

// TODO: detect the endianness of the system when initializing conntrack dumper to handle situations on big-endian platforms.
// All connection labels are required to store in little endian format in conntrack dumper.
func NewConnTrackSystem(nodeConfig *config.NodeConfig, serviceCIDRv4 *net.IPNet, serviceCIDRv6 *net.IPNet, isAntreaProxyEnabled bool) *connTrackSystem {
if err := SetupConntrackParameters(); err != nil {
// Do not fail, but continue after logging an error as we can still dump flows with missing information.
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/flowexporter/connections/conntrack_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) {
// Set expect call for mock ovsCtlClient
ovsctlCmdOutput := []byte("tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=127.0.0.1,dst=8.7.6.5,sport=45170,dport=2379,packets=80743,bytes=5416239),reply=(src=8.7.6.5,dst=127.0.0.1,sport=2379,dport=45170,packets=63361,bytes=4811261),start=2020-07-24T05:07:01.591,id=462801621,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86397,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)\n" +
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,mark=33,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
"tcp,orig=(src=100.10.0.105,dst=10.96.0.1,sport=41284,dport=443,packets=343260,bytes=19340621),reply=(src=100.10.0.106,dst=100.10.0.105,sport=6443,dport=41284,packets=381035,bytes=181176472),start=2020-07-25T08:40:08.959,id=982464968,zone=65520,status=SEEN_REPLY|ASSURED|CONFIRMED|DST_NAT|DST_NAT_DONE,timeout=86399,labels=0x200000001,mark=33,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)")
outputFlow := strings.Split(string(ovsctlCmdOutput), "\n")
expConn := &flowexporter.Connection{
ID: 982464968,
Expand Down Expand Up @@ -177,6 +177,7 @@ func TestConnTrackOvsAppCtl_DumpFlows(t *testing.T) {
DestinationPodNamespace: "",
DestinationPodName: "",
TCPState: "ESTABLISHED",
Labels: []byte{1, 0, 0, 0, 2, 0, 0, 0},
}
mockOVSCtlClient.EXPECT().RunAppctlCmd("dpctl/dump-conntrack", false, "-m", "-s").Return(ovsctlCmdOutput, nil)

Expand Down
37 changes: 27 additions & 10 deletions pkg/agent/flowexporter/connections/conntrack_ovs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package connections

import (
"encoding/hex"
"fmt"
"net"
"strconv"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (ct *connTrackOvsCtl) ovsAppctlDumpConnections(zoneFilter uint16) ([]*flowe

// flowStringToAntreaConnection parses the flow string and converts to Antrea connection.
// Example of flow string:
// "tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)"
// "tcp,orig=(src=127.0.0.1,dst=127.0.0.1,sport=45218,dport=2379,packets=320108,bytes=24615344),reply=(src=127.0.0.1,dst=127.0.0.1,sport=2379,dport=45218,packets=239595,bytes=24347883),start=2020-07-24T05:07:03.998,id=3750535678,status=SEEN_REPLY|ASSURED|CONFIRMED|SRC_NAT_DONE|DST_NAT_DONE,timeout=86399,labels=0x200000001,protoinfo=(state_orig=ESTABLISHED,state_reply=ESTABLISHED,wscale_orig=7,wscale_reply=7,flags_orig=WINDOW_SCALE|SACK_PERM|MAXACK_SET,flags_reply=WINDOW_SCALE|SACK_PERM|MAXACK_SET)"
func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter.Connection, error) {
conn := flowexporter.Connection{}
flowSlice := strings.Split(flow, ",")
Expand Down Expand Up @@ -163,7 +164,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of sport %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of sport %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.TupleOrig.SourcePort = uint16(val)
Expand All @@ -176,7 +177,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of dport %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of dport %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.TupleOrig.DestinationPort = uint16(val)
Expand All @@ -187,7 +188,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 64)
if err != nil {
return nil, fmt.Errorf("conversion of packets %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of packets %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.OriginalPackets = uint64(val)
Expand All @@ -199,7 +200,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 64)
if err != nil {
return nil, fmt.Errorf("conversion of bytes %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of bytes %s to int failed: %v", fields[len(fields)-1], err)
}
if !isReply {
conn.OriginalBytes = uint64(val)
Expand All @@ -213,7 +214,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
timeString := fields[len(fields)-1] + "Z"
val, err := time.Parse(time.RFC3339, timeString)
if err != nil {
return nil, fmt.Errorf("parsing start time %s failed", timeString)
return nil, fmt.Errorf("parsing start time %s failed: %v", timeString, err)
}
conn.StartTime = val
// TODO: We didn't find stoptime related field in flow string right now, need to investigate how stoptime is recorded and dumped.
Expand All @@ -224,7 +225,7 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 16)
if err != nil {
return nil, fmt.Errorf("conversion of zone %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of zone %s to int failed: %v", fields[len(fields)-1], err)
}
if zoneFilter != uint16(val) {
break
Expand All @@ -236,21 +237,37 @@ func flowStringToAntreaConnection(flow string, zoneFilter uint16) (*flowexporter
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of mark '%s' to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of mark '%s' to int failed: %v", fields[len(fields)-1], err)
}
conn.Mark = uint32(val)
case strings.Contains(fs, "timeout"):
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of timeout %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of timeout %s to int failed: %v", fields[len(fields)-1], err)
}
conn.Timeout = uint32(val)
case strings.Contains(fs, "labels"):
fields := strings.Split(fs, "=")
labelStr := strings.Replace(fields[len(fields)-1], "0x", "", -1)
// Add leading zeros since DecodeString() expects the input string has even length
if len(labelStr) < 16 {
labelStr = strings.Repeat("0", 16-len(labelStr)) + labelStr
}
hexval, err := hex.DecodeString(labelStr)
if err != nil {
return nil, fmt.Errorf("conversion of label string %s to []byte failed: %v", labelStr, err)
}
// Reverse the []byte slice to align with kernel side's result which is little endian
for i := 0; i < len(hexval)/2; i++ {
hexval[i], hexval[len(hexval)-i-1] = hexval[len(hexval)-i-1], hexval[i]
}
conn.Labels = hexval
case strings.Contains(fs, "id"):
fields := strings.Split(fs, "=")
val, err := strconv.ParseUint(fields[len(fields)-1], 10, 32)
if err != nil {
return nil, fmt.Errorf("conversion of id %s to int failed", fields[len(fields)-1])
return nil, fmt.Errorf("conversion of id %s to int failed: %v", fields[len(fields)-1], err)
}
conn.ID = uint32(val)
case strings.Contains(fs, "protoinfo"):
Expand Down

0 comments on commit 8c3adad

Please sign in to comment.