Skip to content

Commit

Permalink
Send pod name/ns to nodeagent
Browse files Browse the repository at this point in the history
  • Loading branch information
jayanthvn committed Feb 14, 2024
1 parent b6734b0 commit 1db2d2d
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 75 deletions.
99 changes: 99 additions & 0 deletions cmd/routed-eni-cni-plugin/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ import (
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/cniutils"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
pb "github.com/aws/amazon-vpc-cni-k8s/rpc"
"github.com/aws/amazon-vpc-cni-k8s/utils"
)

const ipamdAddress = "127.0.0.1:50051"

const npAgentAddress = "127.0.0.1:50052"

const dummyInterfacePrefix = "dummy"

var version string
Expand Down Expand Up @@ -121,6 +124,74 @@ func LoadNetConf(bytes []byte) (*NetConf, logger.Logger, error) {
return &conf, log, nil
}

func cleanupNetworkStack(args *skel.CmdArgs, addReply *pb.AddNetworkReply, rpcClient pb.CNIBackendClient, driverClient driver.NetworkAPIs, version, reason string) error {

cniTypes := typeswrapper.New()

conf, log, err := LoadNetConf(args.StdinData)
if err != nil {
return errors.Wrap(err, "cleanupNetworkStack: error loading config from args")
}

var k8sArgs K8sArgs
if err := cniTypes.LoadArgs(args.Args, &k8sArgs); err != nil {
log.Errorf("cleanupNetworkStack: Failed to load k8s config from arg: %v", err)
return errors.Wrap(err, "cleanupNetworkStack:: failed to load k8s config from arg")
}

r, delErr := rpcClient.DelNetwork(context.Background(), &pb.DelNetworkRequest{
ClientVersion: version,
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
K8S_POD_INFRA_CONTAINER_ID: string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID),
ContainerID: args.ContainerID,
IfName: args.IfName,
NetworkName: conf.Name,
Reason: reason,
})

if delErr != nil || !r.Success {
log.Errorf("cleanupNetworkStack : Failed to call IPAMD to release IP of container %s: GRPC returned %v IPAMD returned %v", args.ContainerID, delErr, r)
return errors.Wrap(err, "cleanup: failed to cleanup IP")
}

// We will let the values in result struct guide us in terms of IP Address Family configured.
var v4Addr, v6Addr, addr *net.IPNet

// We don't support dual stack mode currently so it has to be either v4 or v6 mode.
if addReply.IPv4Addr != "" {
v4Addr = &net.IPNet{
IP: net.ParseIP(addReply.IPv4Addr),
Mask: net.CIDRMask(32, 32),
}
addr = v4Addr
} else if addReply.IPv6Addr != "" {
v6Addr = &net.IPNet{
IP: net.ParseIP(addReply.IPv6Addr),
Mask: net.CIDRMask(128, 128),
}
addr = v6Addr
}

// cleanup pod network
if addReply.PodVlanId != 0 {
if isNetnsEmpty(args.Netns) {
log.Infof("cleanupNetworkStack: Ignoring TeardownPodENI as Netns is empty for SG pod:%s namespace: %s containerID:%s", k8sArgs.K8S_POD_NAME, k8sArgs.K8S_POD_NAMESPACE, k8sArgs.K8S_POD_INFRA_CONTAINER_ID)
return nil
}
err = driverClient.TeardownBranchENIPodNetwork(addr, int(addReply.PodVlanId), conf.PodSGEnforcingMode, log)
} else {
err = driverClient.TeardownPodNetwork(addr, int(addReply.DeviceNumber), log)
}

if err != nil {
log.Errorf("cleanupNetworkStack: Failed on TeardownPodNetwork for container ID %s: %v",
args.ContainerID, err)
return errors.Wrap(err, "cleanupNetworkStack: failed on tear down pod network")
}
return nil
}

func cmdAdd(args *skel.CmdArgs) error {
return add(args, typeswrapper.New(), grpcwrapper.New(), rpcwrapper.New(), driver.New())
}
Expand Down Expand Up @@ -275,6 +346,34 @@ func add(args *skel.CmdArgs, cniTypes typeswrapper.CNITYPES, grpcClient grpcwrap
// dummy interface is appended to PrevResult for use during cleanup
result.Interfaces = append(result.Interfaces, dummyInterface)

if utils.IsStrictMode(r.NetworkPolicyMode) {
// Set up a connection to the ipamD server.
npConn, err := grpcClient.Dial(npAgentAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Errorf("Failed to connect to network policy agent: %v", err)
return errors.Wrap(err, "add cmd: failed to connect to network policy agent backend server")
}
defer npConn.Close()

//Make a GRPC call for network policy agent
npc := rpcClient.NewNPBackendClient(npConn)

npr, err := npc.EnforceNpToPod(context.Background(),
&pb.EnforceNpRequest{
K8S_POD_NAME: string(k8sArgs.K8S_POD_NAME),
K8S_POD_NAMESPACE: string(k8sArgs.K8S_POD_NAMESPACE),
})

if err != nil || !npr.Success {
log.Errorf("Failed to setup default network policy Pod Name %s and NameSpace %s: GRPC returned - %v Network policy agent returned - %v",
string(k8sArgs.K8S_POD_NAME), string(k8sArgs.K8S_POD_NAMESPACE), err, npr)
cleanupNetworkStack(args, r, c, driverClient, version, "failed to setup network policy in strict mode")
return errors.New("add cmd: failed to setup network policy in strict mode")
}

log.Debugf("Network Policy agent returned Success : %v", npr.Success)
}

return cniTypes.PrintResult(result, conf.CNIVersion)
}

Expand Down
100 changes: 96 additions & 4 deletions cmd/routed-eni-cni-plugin/cni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestCmdAdd(t *testing.T) {
mockC := mock_rpc.NewMockCNIBackendClient(ctrl)
mocksRPC.EXPECT().NewCNIBackendClient(conn).Return(mockC)

addNetworkReply := &rpc.AddNetworkReply{Success: true, IPv4Addr: ipAddr, DeviceNumber: devNum}
addNetworkReply := &rpc.AddNetworkReply{Success: true, IPv4Addr: ipAddr, DeviceNumber: devNum, NetworkPolicyMode: "none"}
mockC.EXPECT().AddNetwork(gomock.Any(), gomock.Any()).Return(addNetworkReply, nil)

v4Addr := &net.IPNet{
Expand All @@ -110,6 +110,98 @@ func TestCmdAdd(t *testing.T) {
assert.Nil(t, err)
}

func TestCmdAddWithNPenabled(t *testing.T) {
ctrl, mocksTypes, mocksGRPC, mocksRPC, mocksNetwork := setup(t)
defer ctrl.Finish()

stdinData, _ := json.Marshal(netConf)

cmdArgs := &skel.CmdArgs{ContainerID: containerID,
Netns: netNS,
IfName: ifName,
StdinData: stdinData}

mocksTypes.EXPECT().LoadArgs(gomock.Any(), gomock.Any()).Return(nil)

conn, _ := grpc.Dial(ipamdAddress, grpc.WithInsecure())

mocksGRPC.EXPECT().Dial(gomock.Any(), gomock.Any()).Return(conn, nil)
mockC := mock_rpc.NewMockCNIBackendClient(ctrl)
mocksRPC.EXPECT().NewCNIBackendClient(conn).Return(mockC)

npConn, _ := grpc.Dial(npAgentAddress, grpc.WithInsecure())

mocksGRPC.EXPECT().Dial(gomock.Any(), gomock.Any()).Return(npConn, nil)
mockNP := mock_rpc.NewMockNPBackendClient(ctrl)
mocksRPC.EXPECT().NewNPBackendClient(npConn).Return(mockNP)

addNetworkReply := &rpc.AddNetworkReply{Success: true, IPv4Addr: ipAddr, DeviceNumber: devNum, NetworkPolicyMode: "strict"}
mockC.EXPECT().AddNetwork(gomock.Any(), gomock.Any()).Return(addNetworkReply, nil)

enforceNpReply := &rpc.EnforceNpReply{Success: true}
mockNP.EXPECT().EnforceNpToPod(gomock.Any(), gomock.Any()).Return(enforceNpReply, nil)

v4Addr := &net.IPNet{
IP: net.ParseIP(addNetworkReply.IPv4Addr),
Mask: net.IPv4Mask(255, 255, 255, 255),
}
mocksNetwork.EXPECT().SetupPodNetwork(gomock.Any(), cmdArgs.IfName, cmdArgs.Netns,
v4Addr, nil, int(addNetworkReply.DeviceNumber), gomock.Any(), gomock.Any()).Return(nil)

mocksTypes.EXPECT().PrintResult(gomock.Any(), gomock.Any()).Return(nil)

err := add(cmdArgs, mocksTypes, mocksGRPC, mocksRPC, mocksNetwork)
assert.Nil(t, err)
}

func TestCmdAddWithNPenabledWithErr(t *testing.T) {
ctrl, mocksTypes, mocksGRPC, mocksRPC, mocksNetwork := setup(t)
defer ctrl.Finish()

stdinData, _ := json.Marshal(netConf)

cmdArgs := &skel.CmdArgs{ContainerID: containerID,
Netns: netNS,
IfName: ifName,
StdinData: stdinData}

mocksTypes.EXPECT().LoadArgs(gomock.Any(), gomock.Any()).Return(nil)

conn, _ := grpc.Dial(ipamdAddress, grpc.WithInsecure())

mocksGRPC.EXPECT().Dial(gomock.Any(), gomock.Any()).Return(conn, nil)
mockC := mock_rpc.NewMockCNIBackendClient(ctrl)
mocksRPC.EXPECT().NewCNIBackendClient(conn).Return(mockC)

npConn, _ := grpc.Dial(npAgentAddress, grpc.WithInsecure())

mocksGRPC.EXPECT().Dial(gomock.Any(), gomock.Any()).Return(npConn, nil)
mockNP := mock_rpc.NewMockNPBackendClient(ctrl)
mocksRPC.EXPECT().NewNPBackendClient(npConn).Return(mockNP)

addNetworkReply := &rpc.AddNetworkReply{Success: true, IPv4Addr: ipAddr, DeviceNumber: devNum, NetworkPolicyMode: "strict"}
mockC.EXPECT().AddNetwork(gomock.Any(), gomock.Any()).Return(addNetworkReply, nil)

enforceNpReply := &rpc.EnforceNpReply{Success: false}
mockNP.EXPECT().EnforceNpToPod(gomock.Any(), gomock.Any()).Return(enforceNpReply, errors.New("Error on EnforceNpReply"))

v4Addr := &net.IPNet{
IP: net.ParseIP(addNetworkReply.IPv4Addr),
Mask: net.IPv4Mask(255, 255, 255, 255),
}
mocksNetwork.EXPECT().SetupPodNetwork(gomock.Any(), cmdArgs.IfName, cmdArgs.Netns,
v4Addr, nil, int(addNetworkReply.DeviceNumber), gomock.Any(), gomock.Any()).Return(nil)

// when setup network policy fails, expect to return IP back to datastore
delNetworkReply := &rpc.DelNetworkReply{Success: true, IPv4Addr: ipAddr, DeviceNumber: devNum}
mockC.EXPECT().DelNetwork(gomock.Any(), gomock.Any()).Return(delNetworkReply, nil)

mocksNetwork.EXPECT().TeardownPodNetwork(v4Addr, devNum, gomock.Any()).Return(nil)

err := add(cmdArgs, mocksTypes, mocksGRPC, mocksRPC, mocksNetwork)
assert.Error(t, err)
}

func TestCmdAddNetworkErr(t *testing.T) {
ctrl, mocksTypes, mocksGRPC, mocksRPC, mocksNetwork := setup(t)
defer ctrl.Finish()
Expand All @@ -129,7 +221,7 @@ func TestCmdAddNetworkErr(t *testing.T) {
mockC := mock_rpc.NewMockCNIBackendClient(ctrl)
mocksRPC.EXPECT().NewCNIBackendClient(conn).Return(mockC)

addNetworkReply := &rpc.AddNetworkReply{Success: false, IPv4Addr: ipAddr, DeviceNumber: devNum}
addNetworkReply := &rpc.AddNetworkReply{Success: false, IPv4Addr: ipAddr, DeviceNumber: devNum, NetworkPolicyMode: "none"}
mockC.EXPECT().AddNetwork(gomock.Any(), gomock.Any()).Return(addNetworkReply, errors.New("Error on AddNetworkReply"))

err := add(cmdArgs, mocksTypes, mocksGRPC, mocksRPC, mocksNetwork)
Expand All @@ -156,7 +248,7 @@ func TestCmdAddErrSetupPodNetwork(t *testing.T) {
mockC := mock_rpc.NewMockCNIBackendClient(ctrl)
mocksRPC.EXPECT().NewCNIBackendClient(conn).Return(mockC)

addNetworkReply := &rpc.AddNetworkReply{Success: true, IPv4Addr: ipAddr, DeviceNumber: devNum}
addNetworkReply := &rpc.AddNetworkReply{Success: true, IPv4Addr: ipAddr, DeviceNumber: devNum, NetworkPolicyMode: "none"}
mockC.EXPECT().AddNetwork(gomock.Any(), gomock.Any()).Return(addNetworkReply, nil)

addr := &net.IPNet{
Expand Down Expand Up @@ -292,7 +384,7 @@ func TestCmdAddForPodENINetwork(t *testing.T) {
mocksRPC.EXPECT().NewCNIBackendClient(conn).Return(mockC)

addNetworkReply := &rpc.AddNetworkReply{Success: true, IPv4Addr: ipAddr, PodENISubnetGW: "10.0.0.1", PodVlanId: 1,
PodENIMAC: "eniHardwareAddr", ParentIfIndex: 2}
PodENIMAC: "eniHardwareAddr", ParentIfIndex: 2, NetworkPolicyMode: "none"}
mockC.EXPECT().AddNetwork(gomock.Any(), gomock.Any()).Return(addNetworkReply, nil)

addr := &net.IPNet{
Expand Down
20 changes: 20 additions & 0 deletions pkg/ipamd/ipamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ const (
// aws error codes for insufficient IP address scenario
INSUFFICIENT_CIDR_BLOCKS = "InsufficientCidrBlocks"
INSUFFICIENT_FREE_IP_SUBNET = "InsufficientFreeAddressesInSubnet"

// envEnableNetworkPolicy is used to enable IPAMD/CNI to send pod create updates to network policy agent.
envNetworkPolicyMode = "NETWORK_POLICY_ENFORCING_MODE"
defaultNetworkPolicyMode = "none"
)

var log = logger.Get()
Expand Down Expand Up @@ -217,6 +221,7 @@ type IPAMContext struct {
enableManageUntaggedMode bool
enablePodIPAnnotation bool
maxPods int // maximum number of pods that can be scheduled on the node
networkPolicyMode string
}

// setUnmanagedENIs will rebuild the set of ENI IDs for ENIs tagged as "no_manage"
Expand Down Expand Up @@ -347,6 +352,11 @@ func New(k8sClient client.Client) (*IPAMContext, error) {
c.enableManageUntaggedMode = enableManageUntaggedMode()
c.enablePodIPAnnotation = enablePodIPAnnotation()

c.networkPolicyMode, err = getNetworkPolicyMode()
if err != nil {
return nil, err
}

err = c.awsClient.FetchInstanceTypeLimits()
if err != nil {
log.Errorf("Failed to get ENI limits from file:vpc_ip_limits or EC2 for %s", c.awsClient.GetInstanceType())
Expand Down Expand Up @@ -1731,6 +1741,16 @@ func enablePodENI() bool {
return utils.GetBoolAsStringEnvVar(envEnablePodENI, false)
}

func getNetworkPolicyMode() (string, error) {
if value := os.Getenv(envNetworkPolicyMode); value != "" {
if utils.IsValidNetworkPolicyEnforcingMode(value) {
return value, nil
}
return "", errors.New("invalid Network policy mode, supported modes: none, strict, standard")
}
return defaultNetworkPolicyMode, nil
}

func usePrefixDelegation() bool {
return utils.GetBoolAsStringEnvVar(envEnableIpv4PrefixDelegation, false)
}
Expand Down
23 changes: 12 additions & 11 deletions pkg/ipamd/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,18 @@ func (s *server) AddNetwork(ctx context.Context, in *rpc.AddNetworkRequest) (*rp
}
}
resp := rpc.AddNetworkReply{
Success: err == nil,
IPv4Addr: ipv4Addr,
IPv6Addr: ipv6Addr,
DeviceNumber: int32(deviceNumber),
UseExternalSNAT: useExternalSNAT,
VPCv4CIDRs: pbVPCV4cidrs,
VPCv6CIDRs: pbVPCV6cidrs,
PodVlanId: int32(vlanID),
PodENIMAC: branchENIMAC,
PodENISubnetGW: podENISubnetGW,
ParentIfIndex: int32(trunkENILinkIndex),
Success: err == nil,
IPv4Addr: ipv4Addr,
IPv6Addr: ipv6Addr,
DeviceNumber: int32(deviceNumber),
UseExternalSNAT: useExternalSNAT,
VPCv4CIDRs: pbVPCV4cidrs,
VPCv6CIDRs: pbVPCV6cidrs,
PodVlanId: int32(vlanID),
PodENIMAC: branchENIMAC,
PodENISubnetGW: podENISubnetGW,
ParentIfIndex: int32(trunkENILinkIndex),
NetworkPolicyMode: s.ipamContext.networkPolicyMode,
}

log.Infof("Send AddNetworkReply: IPv4Addr: %s, IPv6Addr: %s, DeviceNumber: %d, err: %v", ipv4Addr, ipv6Addr, deviceNumber, err)
Expand Down
Loading

0 comments on commit 1db2d2d

Please sign in to comment.