diff --git a/pkg/agent/controller/networkpolicy/audit_logging.go b/pkg/agent/controller/networkpolicy/audit_logging.go index 2c4d335da15..213aec1459b 100644 --- a/pkg/agent/controller/networkpolicy/audit_logging.go +++ b/pkg/agent/controller/networkpolicy/audit_logging.go @@ -206,26 +206,35 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, c *Controller, ob *logInfo) er } } - // Set match to corresponding ingress/egress reg according to disposition. - match = getMatch(matchers, tableID, disposition) - - if match != nil { - // Get NetworkPolicy full name and OF priority of the conjunction. - conjID, err := getInfoInReg(match, nil) + // Get K8s default deny action, if traffic is default deny, no conjunction could be matched. + if match = getMatchRegField(matchers, openflow.APDenyRegMark.GetField()); match != nil { + cnpDenyRegVal, err := getInfoInReg(match, openflow.APDenyRegMark.GetField().GetRange().ToNXRange()) if err != nil { - return fmt.Errorf("received error while unloading conjunction id from reg: %v", err) - } - ob.npRef, ob.ofPriority, ob.ruleName = c.ofClient.GetPolicyInfoFromConjunction(conjID) - if ob.npRef == "" || ob.ofPriority == "" { - return fmt.Errorf("networkpolicy not found for conjunction id: %v", conjID) + return fmt.Errorf("received error while unloading deny mark from reg: %v", err) } - // Placeholder for K8s NetworkPolicies without rule names. - if ob.ruleName == "" { - ob.ruleName = "" + isK8sDefaultDeny := (cnpDenyRegVal == 0) && (disposition == openflow.DispositionDrop || disposition == openflow.DispositionRej) + if isK8sDefaultDeny { + // For K8s NetworkPolicy implicit drop action, we cannot get Namespace/name. + ob.npRef, ob.ofPriority, ob.ruleName = string(v1beta2.K8sNetworkPolicy), "", "" + return nil } - } else { - // For K8s NetworkPolicy implicit drop action, we cannot get Namespace/name. - ob.npRef, ob.ofPriority, ob.ruleName = string(v1beta2.K8sNetworkPolicy), "", "" + } + + // Set match to corresponding conjunction ID field according to disposition. + match = getMatch(matchers, tableID, disposition) + + // Get NetworkPolicy full name and OF priority of the conjunction. + conjID, err := getInfoInReg(match, nil) + if err != nil { + return fmt.Errorf("received error while unloading conjunction id from reg: %v", err) + } + ob.npRef, ob.ofPriority, ob.ruleName = c.ofClient.GetPolicyInfoFromConjunction(conjID) + if ob.npRef == "" || ob.ofPriority == "" { + return fmt.Errorf("networkpolicy not found for conjunction id: %v", conjID) + } + // Placeholder for K8s NetworkPolicies without rule names. + if ob.ruleName == "" { + ob.ruleName = "" } return nil } diff --git a/pkg/agent/controller/networkpolicy/audit_logging_test.go b/pkg/agent/controller/networkpolicy/audit_logging_test.go index 67af5bd92c8..184457f3115 100644 --- a/pkg/agent/controller/networkpolicy/audit_logging_test.go +++ b/pkg/agent/controller/networkpolicy/audit_logging_test.go @@ -260,7 +260,8 @@ func TestGetNetworkPolicyInfo(t *testing.T) { testK8sRef := "K8sNetworkPolicy:default/test-anp" testPriority, testRule := "61800", "test-rule" allowDispositionData := []byte{0x11, 0x00, 0x00, 0x11} - dropDispositionData := []byte{0x11, 0x00, 0x08, 0x11} + dropCNPDispositionData := []byte{0x11, 0x00, 0x0c, 0x11} + dropK8sDispositionData := []byte{0x11, 0x00, 0x08, 0x11} redirectDispositionData := []byte{0x11, 0x10, 0x00, 0x11} ingressData := []byte{0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11} tests := []struct { @@ -311,7 +312,7 @@ func TestGetNetworkPolicyInfo(t *testing.T) { mockClient.GetPolicyInfoFromConjunction(gomock.Any()).Return( testANPRef, testPriority, testRule) }, - dispositionData: dropDispositionData, + dispositionData: dropCNPDispositionData, wantOb: &logInfo{ tableName: openflow.AntreaPolicyIngressRuleTable.GetName(), disposition: actionDrop, @@ -323,7 +324,7 @@ func TestGetNetworkPolicyInfo(t *testing.T) { { name: "K8s Drop", tableID: openflow.IngressDefaultTable.GetID(), - dispositionData: dropDispositionData, + dispositionData: dropK8sDispositionData, wantOb: &logInfo{ tableName: openflow.IngressDefaultTable.GetName(), disposition: actionDrop, @@ -359,7 +360,7 @@ func TestGetNetworkPolicyInfo(t *testing.T) { if tc.expectedCalls != nil { regID := openflow.TFIngressConjIDField.GetRegID() if tc.wantOb.disposition == actionDrop { - regID = openflow.CNPConjIDField.GetRegID() + regID = openflow.APConjIDField.GetRegID() } ingressMatch := generateMatch(regID, ingressData) matchers = append(matchers, ingressMatch) diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index abe0dbd0973..e56901ce645 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -78,7 +78,7 @@ func getMatchRegField(matchers *ofctrl.Matchers, field *binding.RegField) *ofctr func getMatch(matchers *ofctrl.Matchers, tableID uint8, disposition uint32) *ofctrl.MatchField { // Get match from CNPDenyConjIDReg if disposition is Drop or Reject. if disposition == openflow.DispositionDrop || disposition == openflow.DispositionRej { - return getMatchRegField(matchers, openflow.CNPConjIDField) + return getMatchRegField(matchers, openflow.APConjIDField) } // Get match from ingress/egress reg if disposition is Allow or Pass. for _, table := range append(openflow.GetAntreaPolicyEgressTables(), openflow.EgressRuleTable) { diff --git a/pkg/agent/controller/traceflow/packetin.go b/pkg/agent/controller/traceflow/packetin.go index fe7681b62b6..766e56212a0 100644 --- a/pkg/agent/controller/traceflow/packetin.go +++ b/pkg/agent/controller/traceflow/packetin.go @@ -228,7 +228,7 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (*crdv1alpha1.Tracefl // Get drop table. if tableID == openflow.EgressMetricTable.GetID() || tableID == openflow.IngressMetricTable.GetID() { ob := getNetworkPolicyObservation(tableID, tableID == openflow.IngressMetricTable.GetID()) - if match := getMatchRegField(matchers, openflow.CNPConjIDField); match != nil { + if match := getMatchRegField(matchers, openflow.APConjIDField); match != nil { notAllowConjInfo, err := getRegValue(match, nil) if err != nil { return nil, nil, nil, err diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index ef30679fa57..e0f8fa1702c 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -57,8 +57,9 @@ var ( // reg0[9]: Field to indicate whether the packet's source / destination MAC address needs to be rewritten. RewriteMACRegMark = binding.NewOneBitRegMark(0, 9) NotRewriteMACRegMark = binding.NewOneBitZeroRegMark(0, 9) - // reg0[10]: Mark to indicate the packet is denied(Drop/Reject). - CnpDenyRegMark = binding.NewOneBitRegMark(0, 10) + // reg0[10]: Mark to indicate the packet is denied(Drop/Reject) for Antrea Policy. + // K8s default drop will not be recorded in this reg. + APDenyRegMark = binding.NewOneBitRegMark(0, 10) // reg0[11..12]: Field to indicate disposition of Antrea Policy. It could have more bits to support more dispositions // that Antrea Policy support in the future. Marks in this field include: // - 0b00: allow @@ -100,9 +101,9 @@ var ( // reg3(NXM_NX_REG3) // Field to store the selected Service Endpoint IP EndpointIPField = binding.NewRegField(3, 0, 31) - // Field to store the conjunction ID which is for rule in CNP. It shares the same register with EndpointIPField, + // Field to store the conjunction ID which is for rule in Antrea Policy. It shares the same register with EndpointIPField, // since the service selection will finish when a packet hitting NetworkPolicy related rules. - CNPConjIDField = binding.NewRegField(3, 0, 31) + APConjIDField = binding.NewRegField(3, 0, 31) // reg4(NXM_NX_REG4) // reg4[0..15]: Field to store the selected Service Endpoint port. diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index b629aa42a14..90384198019 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -479,18 +479,18 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { Action().CT(true, IngressMetricTable.GetID(), CtZone, nil).LoadToLabelField(10, IngressRuleCTLabel).CTDone().Done(), AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). MatchConjID(11). - Action().LoadToRegField(CNPConjIDField, 11). - Action().LoadRegMark(CnpDenyRegMark). + Action().LoadToRegField(APConjIDField, 11). + Action().LoadRegMark(APDenyRegMark). Action().GotoTable(IngressMetricTable.GetID()).Done(), AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). MatchConjID(12). - Action().LoadToRegField(CNPConjIDField, 12). - Action().LoadRegMark(CnpDenyRegMark). + Action().LoadToRegField(APConjIDField, 12). + Action().LoadRegMark(APDenyRegMark). Action().GotoTable(IngressMetricTable.GetID()).Done(), AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). MatchConjID(13). - Action().LoadToRegField(CNPConjIDField, 13). - Action().LoadRegMark(CnpDenyRegMark). + Action().LoadToRegField(APConjIDField, 13). + Action().LoadRegMark(APDenyRegMark). Action().GotoTable(IngressMetricTable.GetID()).Done(), AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.40")). @@ -546,13 +546,13 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { MatchProtocol(binding.ProtocolIP).MatchCTStateNew(false).MatchCTLabelField(0, 10, IngressRuleCTLabel). Action().NextTable().Done(), IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchRegMark(CnpDenyRegMark).MatchRegFieldWithValue(CNPConjIDField, 11). + MatchRegMark(APDenyRegMark).MatchRegFieldWithValue(APConjIDField, 11). Action().Drop().Done(), IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchRegMark(CnpDenyRegMark).MatchRegFieldWithValue(CNPConjIDField, 12). + MatchRegMark(APDenyRegMark).MatchRegFieldWithValue(APConjIDField, 12). Action().Drop().Done(), IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchRegMark(CnpDenyRegMark).MatchRegFieldWithValue(CNPConjIDField, 13). + MatchRegMark(APDenyRegMark).MatchRegFieldWithValue(APConjIDField, 13). Action().Drop().Done(), IngressDefaultTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). MatchTunnelID(uint64(UnknownLabelIdentity)). diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index e49db455199..87783408026 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -1667,7 +1667,7 @@ func (f *featureNetworkPolicy) allowRulesMetricFlows(conjunctionID uint32, ingre if metricTable == MulticastEgressMetricTable || metricTable == MulticastIngressMetricTable { flow := metricTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchRegFieldWithValue(CNPConjIDField, conjunctionID). + MatchRegFieldWithValue(APConjIDField, conjunctionID). Action().GotoTable(metricTable.GetNext()). Done() flows = append(flows, flow) @@ -1696,8 +1696,8 @@ func (f *featureNetworkPolicy) denyRuleMetricFlow(conjunctionID uint32, ingress } return metricTable.ofTable.BuildFlow(priorityNormal). Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchRegMark(CnpDenyRegMark). - MatchRegFieldWithValue(CNPConjIDField, conjunctionID). + MatchRegMark(APDenyRegMark). + MatchRegFieldWithValue(APConjIDField, conjunctionID). Action().Drop(). Done() } @@ -1826,7 +1826,7 @@ func (f *featureNetworkPolicy) conjunctionActionFlow(conjunctionID uint32, table // Any matched flow will be resubmitted to next table in corresponding metric tables. if f.enableMulticast && (tableID == MulticastEgressRuleTable.GetID() || tableID == MulticastIngressRuleTable.GetID()) { flow := table.BuildFlow(ofPriority).MatchConjID(conjunctionID). - Action().LoadToRegField(CNPConjIDField, conjunctionID). + Action().LoadToRegField(APConjIDField, conjunctionID). Action().NextTable(). Cookie(f.cookieAllocator.Request(f.category).Raw()). Done() @@ -1858,8 +1858,8 @@ func (f *featureNetworkPolicy) conjunctionActionDenyFlow(conjunctionID uint32, t } flowBuilder := table.BuildFlow(ofPriority). MatchConjID(conjunctionID). - Action().LoadToRegField(CNPConjIDField, conjunctionID). - Action().LoadRegMark(CnpDenyRegMark) + Action().LoadToRegField(APConjIDField, conjunctionID). + Action().LoadRegMark(APDenyRegMark) var customReason int if f.enableDenyTracking { diff --git a/test/e2e/antreapolicy_test.go b/test/e2e/antreapolicy_test.go index bde91bcd197..d2921216c98 100644 --- a/test/e2e/antreapolicy_test.go +++ b/test/e2e/antreapolicy_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/strings/slices" "antrea.io/antrea/pkg/agent/apiserver/handlers/podinterface" crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" @@ -2589,63 +2590,19 @@ func testAuditLoggingBasic(t *testing.T, data *TestData) { } // nodeName is guaranteed to be set at this stage, since the framework waits for all Pods to be in Running phase nodeName := podXA.Spec.NodeName - antreaPodName, err := data.getAntreaPodOnNode(nodeName) - if err != nil { - t.Errorf("Error occurred when trying to get the Antrea Agent Pod running on Node %s: %v", nodeName, err) - } - cmd := []string{"cat", logDir + logfileName} - - if err := wait.Poll(1*time.Second, 10*time.Second, func() (bool, error) { - stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, "antrea-agent", cmd) - if err != nil || stderr != "" { - // file may not exist yet - t.Logf("Error when printing the audit log file, err: %v, stderr: %v", err, stderr) - return false, nil - } - if !strings.Contains(stdout, "test-log-acnp-deny") { - t.Logf("Audit log file does not contain entries for 'test-log-acnp-deny' yet") - return false, nil - } - - destinations := []string{namespaces["z"] + "/a", namespaces["z"] + "/b", namespaces["z"] + "/c"} - srcIPs, _ := podIPs[namespaces["x"]+"/a"] - var expectedNumEntries, actualNumEntries int - for _, d := range destinations { - dstIPs, _ := podIPs[d] - for i := 0; i < len(srcIPs); i++ { - for j := 0; j < len(dstIPs); j++ { - // only look for an entry in the audit log file if srcIP and - // dstIP are of the same family - if strings.Contains(srcIPs[i], ".") != strings.Contains(dstIPs[j], ".") { - continue - } - expectedNumEntries += 1 - // The audit log should contain log entry `... Drop ...` - re := regexp.MustCompile(npRef + ` ` + ruleName + ` Drop [0-9]+ ` + srcIPs[i] + ` [0-9]+ ` + dstIPs[j] + ` ` + strconv.Itoa(int(p80))) - if re.MatchString(stdout) { - actualNumEntries += 1 - } else { - t.Logf("Audit log does not contain expected entry for x/a (%s) to %s (%s)", srcIPs[i], d, dstIPs[j]) - } - break - } - } - } - if actualNumEntries != expectedNumEntries { - t.Logf("Missing entries in audit log: expected %d but found %d", expectedNumEntries, actualNumEntries) - return false, nil - } - return true, nil - }); err != nil { - t.Errorf("Error when polling audit log files for required entries: %v", err) + srcIPs, _ := podIPs[namespaces["x"]+"/a"] + destIPs := append(podIPs[namespaces["z"]+"/a"], append(podIPs[namespaces["z"]+"/b"], podIPs[namespaces["z"]+"/c"]...)...) + expectedLogPrefix := func(_ string) string { + return npRef + ` ` + ruleName + ` Drop [0-9]+ ` } + checkAuditLoggingResult(t, data, nodeName, npRef, srcIPs, destIPs, expectedLogPrefix) failOnError(k8sUtils.CleanACNPs(), t) } -// testAuditLoggingEnableNP tests that audit logs are generated when K8s NP is applied +// testAuditLoggingEnableK8s tests that audit logs are generated when K8s NP is applied // tests both Allow traffic by K8s NP and Drop traffic by implicit K8s policy drop -func testAuditLoggingEnableNP(t *testing.T, data *TestData) { +func testAuditLoggingEnableK8s(t *testing.T, data *TestData) { failOnError(data.updateNamespaceWithAnnotations(namespaces["x"], map[string]string{networkpolicy.EnableNPLoggingAnnotationKey: "true"}), t) // Add a K8s namespaced NetworkPolicy in ns x that allow ingress traffic from // Pod x/b to x/a which default denies other ingress including from Pod x/c to x/a @@ -2680,58 +2637,93 @@ func testAuditLoggingEnableNP(t *testing.T, data *TestData) { } // nodeName is guaranteed to be set at this stage, since the framework waits for all Pods to be in Running phase nodeName := podXA.Spec.NodeName - antreaPodName, err := data.getAntreaPodOnNode(nodeName) + srcIPs := append(podIPs[namespaces["x"]+"/b"], podIPs[namespaces["x"]+"/c"]...) + destIPs, _ := podIPs[namespaces["x"]+"/a"] + expectedLogPrefix := func(srcIP string) string { + if slices.Contains(podIPs[namespaces["x"]+"/b"], srcIP) { + return npRef + " Allow [0-9]+ " + } else if slices.Contains(podIPs[namespaces["x"]+"/c"], srcIP) { + return "K8sNetworkPolicy Drop " + } + return "" + } + checkAuditLoggingResult(t, data, nodeName, "K8sNetworkPolicy", srcIPs, destIPs, expectedLogPrefix) + + failOnError(k8sUtils.DeleteNetworkPolicy(namespaces["x"], "allow-x-b-to-x-a"), t) + failOnError(data.UpdateNamespace(namespaces["x"], func(namespace *v1.Namespace) { + delete(namespace.Annotations, networkpolicy.EnableNPLoggingAnnotationKey) + }), t) +} + +// testAuditLoggingK8sService tests that audit logs are generated for K8s Service access +// tests both Allow traffic by K8s NP and Drop traffic by implicit K8s policy drop +func testAuditLoggingK8sService(t *testing.T, data *TestData) { + failOnError(data.updateNamespaceWithAnnotations(namespaces["x"], map[string]string{networkpolicy.EnableNPLoggingAnnotationKey: "true"}), t) + + // Create and expose nginx service on the same node as pod x/a + podXA, err := k8sUtils.GetPodByLabel(namespaces["x"], "a") if err != nil { - t.Errorf("Error occurred when trying to get the Antrea Agent Pod running on Node %s: %v", nodeName, err) + t.Errorf("Failed to get Pod in Namespace x with label 'pod=a': %v", err) } - cmd := []string{"cat", logDir + logfileName} + serverNode := podXA.Spec.NodeName + serviceName := "nginx" + _, serverIP, nginxCleanupFunc := createAndWaitForPod(t, data, data.createNginxPodOnNode, "test-server-", serverNode, namespaces["x"], false) + defer nginxCleanupFunc() + serverPort := int32(80) + ipFamily := v1.IPv4Protocol + if !strings.Contains(podIPs[namespaces["x"]+"/a"][0], ".") { + ipFamily = v1.IPv6Protocol + } + service, err := data.CreateService(serviceName, namespaces["x"], serverPort, serverPort, map[string]string{"app": "nginx"}, false, false, v1.ServiceTypeClusterIP, &ipFamily) + if err != nil { + t.Fatalf("Error when creating nginx service: %v", err) + } + defer k8sUtils.DeleteService(service.Namespace, service.Name) - if err := wait.Poll(1*time.Second, 10*time.Second, func() (bool, error) { - stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, "antrea-agent", cmd) - if err != nil || stderr != "" { - // file may not exist yet - t.Logf("Error when printing the audit log file, err: %v, stderr: %v", err, stderr) - return false, nil - } - if !strings.Contains(stdout, "K8sNetworkPolicy") { - t.Logf("Audit log file does not contain entries for 'test-log-acnp-deny' yet") - return false, nil - } + // Add a K8s namespaced NetworkPolicy in ns x that allow ingress traffic from + // Pod x/a to service nginx which default denies other ingress including from Pod x/b to service nginx + npRef := "allow-xa-to-service" + k8sNPBuilder := &NetworkPolicySpecBuilder{} + k8sNPBuilder = k8sNPBuilder.SetName(namespaces["x"], npRef). + SetPodSelector(map[string]string{"app": serviceName}). + SetTypeIngress(). + AddIngress(v1.ProtocolTCP, &p80, nil, nil, nil, + map[string]string{"pod": "a"}, nil, nil, nil) - var expectedNumEntries, actualNumEntries int - srcPods := []string{namespaces["x"] + "/b", namespaces["x"] + "/c"} - expectedLogPrefix := []string{npRef + " Allow [0-9]+ ", "K8sNetworkPolicy Drop "} - destIPs, _ := podIPs[namespaces["x"]+"/a"] - for i := 0; i < len(srcPods); i++ { - srcIPs, _ := podIPs[srcPods[i]] - for _, srcIP := range srcIPs { - for _, destIP := range destIPs { - // only look for an entry in the audit log file if srcIP and - // dstIP are of the same family - if strings.Contains(srcIP, ".") != strings.Contains(destIP, ".") { - continue - } - expectedNumEntries += 1 - // The audit log should contain log entry `... Drop ...` - re := regexp.MustCompile(expectedLogPrefix[i] + srcIP + ` [0-9]+ ` + destIP + ` ` + strconv.Itoa(int(p80))) - if re.MatchString(stdout) { - actualNumEntries += 1 - } else { - t.Logf("Audit log does not contain expected entry from %s (%s) to x/a (%s)", srcPods[i], srcIP, destIP) - } - break - } - } - } - if actualNumEntries != expectedNumEntries { - t.Logf("Missing entries in audit log with K8s NP: expected %d but found %d", expectedNumEntries, actualNumEntries) - return false, nil + knp, err := k8sUtils.CreateOrUpdateNetworkPolicy(k8sNPBuilder.Get()) + failOnError(err, t) + failOnError(waitForResourceReady(t, timeout, knp), t) + + // generate some traffic that wget the nginx service + var wg sync.WaitGroup + oneProbe := func(pod *v1.Pod) { + wg.Add(1) + go func() { + defer wg.Done() + data.runWgetCommandFromTestPodWithRetry(pod.Name, pod.Namespace, pod.Spec.Containers[0].Name, serviceName, 5) + }() + } + oneProbe(podXA) + podXB, err := k8sUtils.GetPodByLabel(namespaces["x"], "b") + if err != nil { + t.Errorf("Failed to get Pod in Namespace x with label 'pod=b': %v", err) + } + oneProbe(podXB) + wg.Wait() + + srcIPs := []string{podIPs[namespaces["x"]+"/a"][0], podIPs[namespaces["x"]+"/b"][0]} + destIPs := serverIP.ipStrings + expectedLogPrefix := func(srcIP string) string { + if slices.Contains(podIPs[namespaces["x"]+"/a"], srcIP) { + return npRef + " Allow [0-9]+ " + } else if slices.Contains(podIPs[namespaces["x"]+"/b"], srcIP) { + return "K8sNetworkPolicy Drop " } - return true, nil - }); err != nil { - t.Errorf("Error when polling audit log files for required entries: %v", err) + return "" } - failOnError(k8sUtils.DeleteNetworkPolicy(namespaces["x"], "allow-x-b-to-x-a"), t) + checkAuditLoggingResult(t, data, serverNode, "K8sNetworkPolicy", srcIPs, destIPs, expectedLogPrefix) + + failOnError(k8sUtils.DeleteNetworkPolicy(namespaces["x"], npRef), t) failOnError(data.UpdateNamespace(namespaces["x"], func(namespace *v1.Namespace) { delete(namespace.Annotations, networkpolicy.EnableNPLoggingAnnotationKey) }), t) @@ -3982,6 +3974,54 @@ func testACNPMulticastEgress(t *testing.T, data *TestData, acnpName, caseName, g } } +func checkAuditLoggingResult(t *testing.T, data *TestData, nodeName, logLocator string, srcIPs, destIPs []string, expectedLogPrefix func(string) string) { + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + t.Errorf("Error occurred when trying to get the Antrea Agent Pod running on Node %s: %v", nodeName, err) + } + cmd := []string{"cat", logDir + logfileName} + + if err := wait.Poll(1*time.Second, 10*time.Second, func() (bool, error) { + stdout, stderr, err := data.RunCommandFromPod(antreaNamespace, antreaPodName, "antrea-agent", cmd) + if err != nil || stderr != "" { + // file may not exist yet + t.Logf("Error when printing the audit log file, err: %v, stderr: %v", err, stderr) + return false, nil + } + if !strings.Contains(stdout, logLocator) { + t.Logf("Audit log file does not contain entries for '%s' yet", logLocator) + return false, nil + } + + var expectedNumEntries, actualNumEntries int + for _, srcIP := range srcIPs { + for _, destIP := range destIPs { + // only look for an entry in the audit log file if srcIP and + // dstIP are of the same family + if strings.Contains(srcIP, ".") != strings.Contains(destIP, ".") { + continue + } + expectedNumEntries += 1 + // The audit log should contain log entry `... ...` + re := regexp.MustCompile(expectedLogPrefix(srcIP) + srcIP + ` [0-9]+ ` + destIP + ` ` + strconv.Itoa(int(p80))) + if re.MatchString(stdout) { + actualNumEntries += 1 + } else { + t.Logf("Audit log does not contain expected entry from client (%s) to server (%s)", srcIP, destIP) + } + break + } + } + if actualNumEntries != expectedNumEntries { + t.Logf("Missing entries in audit log: expected %d but found %d", expectedNumEntries, actualNumEntries) + return false, nil + } + return true, nil + }); err != nil { + t.Errorf("Error when polling audit log files for required entries: %v", err) + } +} + func generatePacketCaptureCmd(t *testing.T, data *TestData, timeout int, hostIP, nodeName, podName string) (string, error) { agentPodName := getAntreaPodName(t, data, nodeName) cmds := []string{"antctl", "get", "podinterface", podName, "-n", data.testNamespace, "-o", "json"} @@ -4334,7 +4374,8 @@ func TestAntreaPolicy(t *testing.T) { t.Run("TestGroupAuditLogging", func(t *testing.T) { t.Run("Case=AuditLoggingBasic", func(t *testing.T) { testAuditLoggingBasic(t, data) }) - t.Run("Case=AuditLoggingEnableNP", func(t *testing.T) { testAuditLoggingEnableNP(t, data) }) + t.Run("Case=AuditLoggingEnableK8s", func(t *testing.T) { testAuditLoggingEnableK8s(t, data) }) + t.Run("Case=AuditLoggingK8sService", func(t *testing.T) { testAuditLoggingK8sService(t, data) }) }) t.Run("TestMulticastNP", func(t *testing.T) { diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 8c9296d6657..26b5c0da5a1 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -2067,12 +2067,16 @@ func (data *TestData) runNetcatCommandFromTestPodWithProtocol(podName string, ns return fmt.Errorf("nc stdout: <%v>, stderr: <%v>, err: <%v>", stdout, stderr, err) } -func (data *TestData) runWgetCommandOnBusyboxWithRetry(podName, ns string, url string, maxAttempts int) (string, string, error) { +func (data *TestData) runWgetCommandOnBusyboxWithRetry(podName string, ns string, url string, maxAttempts int) (string, string, error) { + return data.runWgetCommandFromTestPodWithRetry(podName, ns, busyboxContainerName, url, maxAttempts) +} + +func (data *TestData) runWgetCommandFromTestPodWithRetry(podName string, ns string, containerName string, url string, maxAttempts int) (string, string, error) { var stdout, stderr string var err error cmd := []string{"wget", "-O", "-", url, "-T", "1"} for i := 0; i < maxAttempts; i++ { - stdout, stderr, err = data.RunCommandFromPod(ns, podName, busyboxContainerName, cmd) + stdout, stderr, err = data.RunCommandFromPod(ns, podName, containerName, cmd) if err != nil { if i < maxAttempts-1 { time.Sleep(time.Second) diff --git a/test/e2e/k8s_util.go b/test/e2e/k8s_util.go index 42d12591900..7b231b06a90 100644 --- a/test/e2e/k8s_util.go +++ b/test/e2e/k8s_util.go @@ -364,10 +364,10 @@ func (k *KubernetesUtils) Probe(ns1, pod1, ns2, pod2 string, port int32, protoco } toPod := toPods[0] fromPodName, toPodName := fmt.Sprintf("%s/%s", ns1, pod1), fmt.Sprintf("%s/%s", ns2, pod2) - return k.prodeAndDecideConnectivity(fromPod, toPod, fromPodName, toPodName, port, protocol) + return k.probeAndDecideConnectivity(fromPod, toPod, fromPodName, toPodName, port, protocol) } -func (k *KubernetesUtils) prodeAndDecideConnectivity(fromPod, toPod v1.Pod, +func (k *KubernetesUtils) probeAndDecideConnectivity(fromPod, toPod v1.Pod, fromPodName, toPodName string, port int32, protocol utils.AntreaPolicyProtocol) (PodConnectivityMark, error) { // Both IPv4 and IPv6 address should be tested. connectivity := Unknown