Skip to content

Commit

Permalink
Fix statefulset Pod recreate case
Browse files Browse the repository at this point in the history
  • Loading branch information
tnqn committed Jun 11, 2020
1 parent f93ba17 commit c970d3a
Show file tree
Hide file tree
Showing 22 changed files with 294 additions and 248 deletions.
4 changes: 2 additions & 2 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ func TestInitstore(t *testing.T) {
if store.Len() != 2 {
t.Errorf("Failed to load OVS port in store")
}
container1, found1 := store.GetContainerInterface("pod1", "ns1")
container1, found1 := store.GetContainerInterface(uuid1)
if !found1 {
t.Errorf("Failed to load OVS port into local store")
} else if container1.OFPort != 1 || container1.IP.String() != p1IP || container1.MAC.String() != p1MAC || container1.InterfaceName != "p1" {
t.Errorf("Failed to load OVS port configuration into local store")
}
_, found2 := store.GetContainerInterface("pod2", "ns2")
_, found2 := store.GetContainerInterface(uuid2)
if !found2 {
t.Errorf("Failed to load OVS port into local store")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/apiserver/handlers/networkpolicy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func HandleFunc(aq querier.AgentQuerier) http.HandlerFunc {
}
} else if pod != "" {
// Query NetworkPolicies applied to the Pod
_, ok := aq.GetInterfaceStore().GetContainerInterface(pod, ns)
if ok {
interfaces := aq.GetInterfaceStore().GetContainerInterfacesByPod(pod, ns)
if len(interfaces) > 0 {
nps := npq.GetAppliedNetworkPolicies(pod, ns)
obj = networkingv1beta1.NetworkPolicyList{Items: nps}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/apiserver/handlers/ovsflows/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ func getTableFlows(aq querier.AgentQuerier, table string) ([]Response, error) {
}

func getPodFlows(aq querier.AgentQuerier, podName, namespace string) ([]Response, error) {
intf, ok := aq.GetInterfaceStore().GetContainerInterface(podName, namespace)
if !ok {
interfaces := aq.GetInterfaceStore().GetContainerInterfacesByPod(podName, namespace)
if len(interfaces) == 0 {
return nil, nil
}

flowKeys := aq.GetOpenflowClient().GetPodFlowKeys(intf.InterfaceName)
flowKeys := aq.GetOpenflowClient().GetPodFlowKeys(interfaces[0].InterfaceName)
return dumpMatchedFlows(aq, flowKeys)

}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/apiserver/handlers/ovsflows/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ func TestPodFlows(t *testing.T) {
if tc.expectedStatus != http.StatusNotFound {
ofc := oftest.NewMockClient(ctrl)
ovsctl := ovsctltest.NewMockOVSCtlClient(ctrl)
i.EXPECT().GetContainerInterface(tc.name, tc.namespace).Return(testInterface, true).Times(1)
i.EXPECT().GetContainerInterfaceByPod(tc.name, tc.namespace).Return([]*interfacestore.InterfaceConfig{testInterface}).Times(1)
ofc.EXPECT().GetPodFlowKeys(testInterface.InterfaceName).Return(testFlowKeys).Times(1)
q.EXPECT().GetOpenflowClient().Return(ofc).Times(1)
q.EXPECT().GetOVSCtlClient().Return(ovsctl).Times(len(testFlowKeys))
for i := range testFlowKeys {
ovsctl.EXPECT().DumpMatchedFlow(testFlowKeys[i]).Return(testDumpResults[i], nil).Times(1)
}
} else {
i.EXPECT().GetContainerInterface(tc.name, tc.namespace).Return(nil, false).Times(1)
i.EXPECT().GetContainerInterfaceByPod(tc.name, tc.namespace).Return(nil).Times(1)
}

runHTTPTest(t, &tc, q)
Expand Down
12 changes: 8 additions & 4 deletions pkg/agent/apiserver/handlers/ovstracing/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func getPeerAddress(aq querier.AgentQuerier, peer *tracingPeer) (net.IP, *interf
return intf.IP, intf, nil
}

intf, ok := aq.GetInterfaceStore().GetContainerInterface(peer.name, peer.namespace)
if ok {
interfaces := aq.GetInterfaceStore().GetContainerInterfacesByPod(peer.name, peer.namespace)
if len(interfaces) > 0 {
// Local Pod.
return intf.IP, intf, nil
return interfaces[0].IP, interfaces[0], nil
}

// Try getting the Pod from K8s API.
Expand All @@ -115,7 +115,11 @@ func prepareTracingRequest(aq querier.AgentQuerier, req *request) (*ovsctl.Traci
if req.inputPort.ovsPort != "" {
inPort, ok = aq.GetInterfaceStore().GetInterfaceByName(req.inputPort.ovsPort)
} else if req.inputPort.name != "" {
inPort, ok = aq.GetInterfaceStore().GetContainerInterface(req.inputPort.name, req.inputPort.namespace)
interfaces := aq.GetInterfaceStore().GetContainerInterfacesByPod(req.inputPort.name, req.inputPort.namespace)
if len(interfaces) > 0 {
inPort = interfaces[0]
ok = true
}
}
if !ok {
return nil, handlers.NewHandlerError(errors.New("Input port not found"), http.StatusNotFound)
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/apiserver/handlers/ovstracing/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestPodFlows(t *testing.T) {
if tc.expectedStatus == http.StatusNotFound {
q.EXPECT().GetInterfaceStore().Return(i).Times(1)
if tc.port == "pod" {
i.EXPECT().GetContainerInterface("inPod", "inNS").Return(nil, false).Times(1)
i.EXPECT().GetContainerInterfaceByPod("inPod", "inNS").Return(nil).Times(1)
} else {
i.EXPECT().GetInterfaceByName(tc.port).Return(nil, false).Times(1)
}
Expand All @@ -201,12 +201,12 @@ func TestPodFlows(t *testing.T) {
q.EXPECT().GetOpenflowClient().Return(ofc).Times(1)
ofc.EXPECT().GetTunnelVirtualMAC().Return(tunnelVirtualMAC).Times(1)
} else if tc.port == "pod" {
i.EXPECT().GetContainerInterface("inPod", "inNS").Return(inPodInterface, true).Times(1)
i.EXPECT().GetContainerInterfaceByPod("inPod", "inNS").Return([]*interfacestore.InterfaceConfig{inPodInterface}).Times(1)
} else if tc.port != "" {
i.EXPECT().GetInterfaceByName(tc.port).Return(inPodInterface, true).Times(1)
}
i.EXPECT().GetContainerInterface("srcPod", "srcNS").Return(srcPodInterface, true).MaxTimes(1)
i.EXPECT().GetContainerInterface("dstPod", "dstNS").Return(dstPodInterface, true).MaxTimes(1)
i.EXPECT().GetContainerInterfaceByPod("srcPod", "srcNS").Return([]*interfacestore.InterfaceConfig{srcPodInterface}).MaxTimes(1)
i.EXPECT().GetContainerInterfaceByPod("dstPod", "dstNS").Return([]*interfacestore.InterfaceConfig{dstPodInterface}).MaxTimes(1)

if tc.expectedStatus == http.StatusBadRequest {
// "ovs-appctl" won't be executed. OVSCtlClient.Trace() will just
Expand Down
27 changes: 6 additions & 21 deletions pkg/agent/cniserver/interface_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ func newInterfaceConfigurator(ovsDatapathType string) (*ifConfigurator, error) {
// setupInterfaces creates a veth pair: containerIface is in the container
// network namespace and hostIface is in the host network namespace.
func (ic *ifConfigurator) setupInterfaces(
podName, podNamespace, ifname string,
podName, podNamespace, containerID, ifname string,
netns ns.NetNS,
mtu int) (hostIface *current.Interface, containerIface *current.Interface, err error) {
hostVethName := util.GenerateContainerInterfaceName(podName, podNamespace)
hostVethName := util.GenerateContainerInterfaceName(podName, podNamespace, containerID)
hostIface = &current.Interface{}
containerIface = &current.Interface{}

Expand Down Expand Up @@ -80,20 +80,6 @@ func (ic *ifConfigurator) setupInterfaces(
return hostIface, containerIface, nil
}

// configureContainerAddr takes the result of the IPAM plugin, and adds the appropriate IP
// addresses and routes to the interface. It then sends a gratuitous ARP to the network.
func configureContainerAddr(netns ns.NetNS, containerInterface *current.Interface, result *current.Result) error {
if err := netns.Do(func(containerNs ns.NetNS) error {
if err := ipam.ConfigureIface(containerInterface.Name, result); err != nil {
return err
}
return nil
}); err != nil {
return err
}
return nil
}

// advertiseContainerAddr sends 3 GARP packets in another goroutine with 50ms interval. It's because Openflow entries are
// installed async, and the gratuitous ARP could be sent out after the Openflow entries are installed. Using another
// goroutine to ensure the processing of CNI ADD request is not blocked.
Expand Down Expand Up @@ -151,18 +137,17 @@ func (ic *ifConfigurator) configureContainerLink(
}
defer netns.Close()
// Create veth pair and link up
hostIface, containerIface, err := ic.setupInterfaces(podName, podNameSpace, containerIFDev, netns, mtu)
hostIface, containerIface, err := ic.setupInterfaces(podName, podNameSpace, containerID, containerIFDev, netns, mtu)
if err != nil {
return fmt.Errorf("failed to create veth devices for container %s: %v", containerID, err)
}

result.Interfaces = []*current.Interface{hostIface, containerIface}

// Note that configuring IP will send gratuitous ARP, it must be executed
// after Pod Openflow entries are installed, otherwise gratuitous ARP would
// be dropped.
klog.V(2).Infof("Configuring IP address for container %s", containerID)
if err = configureContainerAddr(netns, containerIface, result); err != nil {
if err := netns.Do(func(_ ns.NetNS) error {
return ipam.ConfigureIface(containerIface.Name, result)
}); err != nil {
return fmt.Errorf("failed to configure IP address for container %s: %v", containerID, err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/cniserver/interface_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (ic *ifConfigurator) configureContainerLink(
mtu int,
result *current.Result,
) error {
epName := util.GenerateContainerInterfaceName(podName, podNameSpace)
epName := util.GenerateContainerInterfaceName(podName, podNameSpace, containerID)
// Search endpoint from local cache.
endpoint, found := ic.getEndpoint(epName)
if !found {
Expand Down Expand Up @@ -139,7 +139,7 @@ func (ic *ifConfigurator) configureContainerLink(

// createContainerLink creates HNSEndpoint using the IP configuration in the IPAM result.
func (ic *ifConfigurator) createContainerLink(podName string, podNameSpace string, containerID string, result *current.Result) (hostLink *hcsshim.HNSEndpoint, err error) {
epName := util.GenerateContainerInterfaceName(podName, podNameSpace)
epName := util.GenerateContainerInterfaceName(podName, podNameSpace, containerID)

// Create a new Endpoint if not found.
if err := ic.ensureHNSNetwork(); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions pkg/agent/cniserver/ipam/ipam_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ func ExecIPAMAdd(cniArgs *cnipb.CniCmdArgs, ipamType string, resultKey string) (
}

func ExecIPAMDelete(cniArgs *cnipb.CniCmdArgs, ipamType string, resultKey string) error {
_, ok := GetIPFromCache(resultKey)
if !ok {
return nil
}
args := argsFromEnv(cniArgs)
driver := ipamDrivers[ipamType]
err := driver.Del(args, cniArgs.NetworkConfiguration)
Expand Down
119 changes: 56 additions & 63 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"github.com/containernetworking/cni/pkg/types/current"
"github.com/containernetworking/cni/pkg/version"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/interfacestore"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/agent/route"
"github.com/vmware-tanzu/antrea/pkg/agent/util"
"github.com/vmware-tanzu/antrea/pkg/k8s"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
)

Expand Down Expand Up @@ -213,12 +215,12 @@ func (pc *podConfigurator) configureInterfaces(

// Check if the OVS configurations for the container exists or not. If yes, return immediately. This check is
// used on Windows, as kubelet on Windows will call CNI Add for infrastructure container for multiple times
// to query IP of Pod. But there should be only one OVS port created for the same Pod. And if the OVS port is added more than
// once, OVS will return an error.
// to query IP of Pod. But there should be only one OVS port created for the same container. And if the OVS port is
// added more than once, OVS will return an error.
// See https://github.com/kubernetes/kubernetes/issues/57253#issuecomment-358897721.
_, found := pc.ifaceStore.GetContainerInterface(podName, podNameSpace)
_, found := pc.ifaceStore.GetContainerInterface(containerID)
if found {
klog.V(2).Infof("Found an existed OVS port with podName %s podNamespace %s, returning", podName, podNameSpace)
klog.V(2).Infof("Found an existing OVS port for container %s, returning", containerID)
// Mark the operation as successful, otherwise the container link might be removed by mistake.
success = true
return nil
Expand Down Expand Up @@ -262,8 +264,8 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s
}
}

func (pc *podConfigurator) removeInterfaces(podName, podNamespace, containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(podName, podNamespace)
func (pc *podConfigurator) removeInterfaces(containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
return nil
Expand Down Expand Up @@ -341,7 +343,7 @@ func (pc *podConfigurator) validateOVSInterfaceConfig(
containerID, podName, podNamespace string,
containerMAC string,
ips []*current.IPConfig) error {
if containerConfig, found := pc.ifaceStore.GetContainerInterface(podName, podNamespace); found {
if containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID); found {
if containerConfig.MAC.String() != containerMAC {
return fmt.Errorf("interface MAC %s does not match container %s MAC",
containerConfig.MAC.String(), containerID)
Expand Down Expand Up @@ -379,71 +381,62 @@ func parsePrevResult(conf *NetworkConfig) error {
}

func (pc *podConfigurator) reconcile(pods []corev1.Pod) error {
// desiredInterfaces is the exact set of interfaces that should be present, based on the
// current list of Pods.
desiredInterfaces := make(map[string]bool)
// desiredPods is the set of Pods that should be present, based on the
// current list of Pods got from the Kubernetes API.
desiredPods := sets.NewString()
// actualPods is the set of Pods that are present, based on the container
// interfaces got from the OVSDB.
actualPods := sets.NewString()
// knownInterfaces is the list of interfaces currently in the local cache.
knownInterfaces := pc.ifaceStore.GetInterfaceKeysByType(interfacestore.ContainerInterface)
knownInterfaces := pc.ifaceStore.GetInterfacesByType(interfacestore.ContainerInterface)

for _, pod := range pods {
// Skip Pods for which we are not in charge of the networking.
if pod.Spec.HostNetwork {
continue
}

// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
containerConfig, found := pc.ifaceStore.GetContainerInterface(pod.Name, pod.Namespace)
if !found {
// This should not happen since OVSDB is persisted on the Node.
// TODO: is there anything else we should be doing? Assuming that the Pod's
// interface still exists, we can repair the interface store since we can
// retrieve the name of the host interface for the Pod by calling
// GenerateContainerInterfaceName. One thing we would not be able to
// retrieve is the container ID which is part of the container configuration
// we store in the cache, but this ID is not used for anything at the
// moment. However, if the interface does not exist, there is nothing we can
// do since we do not have the original CNI parameters.
klog.Warningf("Interface for Pod %s/%s not found in the interface store", pod.Namespace, pod.Name)
continue
}
klog.V(4).Infof("Syncing interface %s for Pod %s/%s", containerConfig.InterfaceName, pod.Namespace, pod.Name)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IP,
containerConfig.MAC,
pc.gatewayMAC,
uint32(containerConfig.OFPort),
); err != nil {
klog.Errorf("Error when re-installing flows for Pod %s/%s", pod.Namespace, pod.Name)
continue
desiredPods.Insert(k8s.NamespacedName(pod.Namespace, pod.Name))
}

for _, containerConfig := range knownInterfaces {
namespacedName := k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName)
actualPods.Insert(namespacedName)
if desiredPods.Has(namespacedName) {
// This interface matches an existing Pod.
// We rely on the interface cache / store - which is initialized from the persistent
// OVSDB - to map the Pod to its interface configuration. The interface
// configuration includes the parameters we need to replay the flows.
klog.V(4).Infof("Syncing interface %s for Pod %s", containerConfig.InterfaceName, namespacedName)
if err := pc.ofClient.InstallPodFlows(
containerConfig.InterfaceName,
containerConfig.IP,
containerConfig.MAC,
pc.gatewayMAC,
uint32(containerConfig.OFPort),
); err != nil {
klog.Errorf("Error when re-installing flows for Pod %s", namespacedName)
}
} else {
// clean-up and delete interface
klog.V(4).Infof("Deleting interface %s", containerConfig.InterfaceName)
if err := pc.removeInterfaces(containerConfig.ContainerID); err != nil {
klog.Errorf("Failed to delete interface %s: %v", containerConfig.InterfaceName, err)
}
// interface should no longer be in store after the call to removeInterfaces
}
desiredInterfaces[util.GenerateContainerInterfaceKey(pod.Name, pod.Namespace)] = true
}

for _, ifaceID := range knownInterfaces {
if _, found := desiredInterfaces[ifaceID]; found {
// this interface matches an existing Pod.
continue
}
// clean-up and delete interface
containerConfig, found := pc.ifaceStore.GetInterface(ifaceID)
if !found {
// should not happen, nothing should have concurrent access to the interface
// store.
klog.Errorf("Interface %s can no longer be found in the interface store", ifaceID)
continue
}
klog.V(4).Infof("Deleting interface %s", ifaceID)
if err := pc.removeInterfaces(
containerConfig.PodName,
containerConfig.PodNamespace,
containerConfig.ContainerID,
); err != nil {
klog.Errorf("Failed to delete interface %s: %v", ifaceID, err)
}
// interface should no longer be in store after the call to removeInterfaces
for pod := range desiredPods.Difference(actualPods) {
// This should not happen since OVSDB is persisted on the Node.
// TODO: is there anything else we should be doing? Assuming that the Pod's
// interface still exists, we can repair the interface store since we can
// retrieve the name of the host interface for the Pod by calling
// GenerateContainerInterfaceName. One thing we would not be able to
// retrieve is the container ID which is part of the container configuration
// we store in the cache, but this ID is not used for anything at the
// moment. However, if the interface does not exist, there is nothing we can
// do since we do not have the original CNI parameters.
klog.Warningf("Interface for Pod %s not found in the interface store", pod)
}
return nil
}
Expand Down Expand Up @@ -542,7 +535,7 @@ func (pc *podConfigurator) connectInterceptedInterface(

// disconnectInterceptedInterface disconnects intercepted interface from ovs br-int.
func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(podName, podNamespace)
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
return nil
Expand Down
Loading

0 comments on commit c970d3a

Please sign in to comment.