Skip to content

Commit

Permalink
[flexible-ipam] Fix IP annotation not work on a StatefulSet
Browse files Browse the repository at this point in the history
Signed-off-by: gran <gran@vmware.com>
  • Loading branch information
gran-vmv committed Nov 21, 2023
1 parent bdf2d6b commit 8167259
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 14 deletions.
36 changes: 29 additions & 7 deletions pkg/controller/ipam/antrea_ipam_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"strings"
"time"

Expand Down Expand Up @@ -253,31 +254,48 @@ func (c *AntreaIPAMController) cleanIPPoolForStatefulSet(namespacedName string)
}

// Find IP Pools annotated to StatefulSet via direct annotation or Namespace annotation
func (c *AntreaIPAMController) getIPPoolsForStatefulSet(ss *appsv1.StatefulSet) []string {
func (c *AntreaIPAMController) getIPPoolsForStatefulSet(ss *appsv1.StatefulSet) ([]string, []net.IP) {

// Inspect IP annotation for the Pods
ipStrings, _ := ss.Spec.Template.Annotations[annotation.AntreaIPAMPodIPAnnotationKey]
ipStrings = strings.ReplaceAll(ipStrings, " ", "")
var ips []net.IP
if ipStrings != "" {
splittedIPStrings := strings.Split(ipStrings, annotation.AntreaIPAMAnnotationDelimiter)
for _, ipString := range splittedIPStrings {
ip := net.ParseIP(ipString)
if ipString != "" && ip == nil {
klog.Errorf("invalid IP annotation %s", ipStrings)
ips = nil
break
}
ips = append(ips, ip)
}
}

// Inspect pool annotation for the Pods
// In order to avoid extra API call in IPAM driver, IPAM annotations are defined
// on Pods rather than on StatefulSet
annotations, exists := ss.Spec.Template.Annotations[annotation.AntreaIPAMAnnotationKey]
if exists {
// Stateful Set Pod is annotated with dedicated IP pool
return strings.Split(annotations, annotation.AntreaIPAMAnnotationDelimiter)
return strings.Split(annotations, annotation.AntreaIPAMAnnotationDelimiter), ips
}

// Inspect Namespace
namespace, err := c.namespaceLister.Get(ss.Namespace)
if err != nil {
// Should never happen
klog.Errorf("Namespace %s not found for StatefulSet %s", ss.Namespace, ss.Name)
return nil
return nil, nil
}

annotations, exists = namespace.Annotations[annotation.AntreaIPAMAnnotationKey]
if exists {
return strings.Split(annotations, annotation.AntreaIPAMAnnotationDelimiter)
return strings.Split(annotations, annotation.AntreaIPAMAnnotationDelimiter), ips
}

return nil
return nil, nil

}

Expand All @@ -287,7 +305,11 @@ func (c *AntreaIPAMController) getIPPoolsForStatefulSet(ss *appsv1.StatefulSet)
func (c *AntreaIPAMController) preallocateIPPoolForStatefulSet(ss *appsv1.StatefulSet) error {
klog.InfoS("Processing create notification", "Namespace", ss.Namespace, "StatefulSet", ss.Name)

ipPools := c.getIPPoolsForStatefulSet(ss)
ipPools, ips := c.getIPPoolsForStatefulSet(ss)
var ip net.IP
if len(ips) > 0 {
ip = ips[0]
}

if ipPools == nil {
// nothing to preallocate
Expand All @@ -310,7 +332,7 @@ func (c *AntreaIPAMController) preallocateIPPoolForStatefulSet(ss *appsv1.Statef
// in the pool. This safeguards us from double allocation in case agent allocated IP by the time
// controller task is executed. Note also that StatefulSet resize will not be handled.
if size > 0 {
err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size)
err = allocator.AllocateStatefulSet(ss.Namespace, ss.Name, size, ip)
if err != nil {
return fmt.Errorf("failed to preallocate continuous IP space of size %d from Pool %s: %s", size, ipPoolName, err)
}
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/ipam/antrea_ipam_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package ipam
import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -282,3 +284,62 @@ func TestReleaseStaleAddresses(t *testing.T) {

require.NoError(t, err)
}

func TestAntreaIPAMController_getIPPoolsForStatefulSet(t *testing.T) {
tests := []struct {
name string
prepareFunc func(*appsv1.StatefulSet)
want bool
want1 []net.IP
}{
{
name: "no annotation",
prepareFunc: func(sts *appsv1.StatefulSet) {
delete(sts.Spec.Template.Annotations, annotation.AntreaIPAMAnnotationKey)
},
want: false,
want1: nil,
},
{
name: "ippool",
prepareFunc: func(sts *appsv1.StatefulSet) {},
want: true,
want1: nil,
},
{
name: "valid ip",
prepareFunc: func(sts *appsv1.StatefulSet) {
sts.Spec.Template.Annotations[annotation.AntreaIPAMPodIPAnnotationKey] = "10.2.2.109"
},
want: true,
want1: []net.IP{net.ParseIP("10.2.2.109")},
},
{
name: "invalid ip",
prepareFunc: func(sts *appsv1.StatefulSet) {
sts.Spec.Template.Annotations[annotation.AntreaIPAMPodIPAnnotationKey] = "10.2.2.109, a.b.c.d"
},
want: true,
want1: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
namespace, pool, statefulSet := initTestObjects(false, true, 1)
tt.prepareFunc(statefulSet)
controller := newFakeAntreaIPAMController(pool, namespace, statefulSet)
controller.informerFactory.Start(stopCh)
controller.crdInformerFactory.Start(stopCh)

got, got1 := controller.getIPPoolsForStatefulSet(statefulSet)
var want []string
if tt.want {
want = []string{pool.Name}
}
assert.Equalf(t, want, got, "getIPPoolsForStatefulSet()")
assert.Equalf(t, tt.want1, got1, "getIPPoolsForStatefulSet()")
})
}
}
10 changes: 8 additions & 2 deletions pkg/ipam/poolallocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (a *IPPoolAllocator) AllocateReservedOrNext(state v1alpha2.IPAddressPhase,
// This functionality is useful when StatefulSet does not have a dedicated IP Pool assigned.
// It returns error if such range is not available. In this case IPs for the StatefulSet will
// be allocated on the fly, and there is no guarantee for continuous IPs.
func (a *IPPoolAllocator) AllocateStatefulSet(namespace, name string, size int) error {
func (a *IPPoolAllocator) AllocateStatefulSet(namespace, name string, size int, ip net.IP) error {
// Retry on CRD update conflict which is caused by multiple agents updating a pool at same time.
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
ipPool, allocators, err := a.getPoolAndInitIPAllocators()
Expand All @@ -450,7 +450,13 @@ func (a *IPPoolAllocator) AllocateStatefulSet(namespace, name string, size int)
}
}

ips, err := allocators.AllocateRange(size)
var ips []net.IP
if size == 1 && ip != nil {
err = allocators.AllocateIP(ip)
ips = []net.IP{ip}
} else {
ips, err = allocators.AllocateRange(size)
}
if err != nil {
return err
}
Expand Down
20 changes: 19 additions & 1 deletion pkg/ipam/poolallocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func TestAllocateReleaseStatefulSet(t *testing.T) {
}

allocator := newTestIPPoolAllocator(&pool, stopCh)
err := allocator.AllocateStatefulSet(testNamespace, setName, 7)
err := allocator.AllocateStatefulSet(testNamespace, setName, 7, nil)
require.NoError(t, err)

// Make sure reserved IPs are respected for next allocate
Expand All @@ -433,4 +433,22 @@ func TestAllocateReleaseStatefulSet(t *testing.T) {

// Make sure reserved IPs are released
validateAllocationSequence(t, allocator, subnetInfo, []string{"10.2.2.100"})

allocator = newTestIPPoolAllocator(&pool, stopCh)
err = allocator.AllocateStatefulSet(testNamespace, setName, 1, net.ParseIP("10.2.2.101"))
require.NoError(t, err)

// Make sure specified IP is reserved
validateAllocationSequence(t, allocator, subnetInfo, []string{"10.2.2.100", "10.2.2.102"})

// Release the set
err = allocator.ReleaseStatefulSet(testNamespace, setName)
require.NoError(t, err)

// Make sure reserved IP is released
validateAllocationSequence(t, allocator, subnetInfo, []string{"10.2.2.101"})

// Invalid IP will result in an error
err = allocator.AllocateStatefulSet(testNamespace, setName, 1, net.ParseIP("10.2.3.103"))
require.Error(t, err)
}
40 changes: 36 additions & 4 deletions test/e2e/antreaipam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,35 @@ func testAntreaIPAMStatefulSet(t *testing.T, data *TestData, dedicatedIPPoolKey
}
checkStatefulSetIPPoolAllocation(t, data, stsName, testAntreaIPAMNamespace, ipPoolName, ipOffsets, reservedIPOffsets)

stsName2 := randName("sts-test-")
ipOffsets = []int32{3}
size = len(ipOffsets)
reservedIPOffsets = ipOffsets
startIPString := subnetIPv4RangesMap[testAntreaIPAMNamespace].Spec.IPRanges[0].Start
offset := int(ipOffsets[0])
if dedicatedIPPoolKey != nil {
startIPString = subnetIPv4RangesMap[*dedicatedIPPoolKey].Spec.IPRanges[0].Start
}
expectedPodIP := utilnet.AddIPOffset(utilnet.BigForIP(net.ParseIP(startIPString)), offset)
mutateFunc = func(sts *appsv1.StatefulSet) {
if sts.Spec.Template.Annotations == nil {
sts.Spec.Template.Annotations = map[string]string{}
}
if dedicatedIPPoolKey != nil {
sts.Spec.Template.Annotations[annotation.AntreaIPAMAnnotationKey] = ipPoolName
}
sts.Spec.Template.Annotations[annotation.AntreaIPAMPodIPAnnotationKey] = expectedPodIP.String()
}
_, cleanup2, err := data.createStatefulSet(stsName2, testAntreaIPAMNamespace, int32(size), agnhostContainerName, agnhostImage, []string{"sleep", "3600"}, nil, mutateFunc)
if err != nil {
t.Fatalf("Error when creating StatefulSet '%s': %v", stsName2, err)
}
defer cleanup2()
if err := data.waitForStatefulSetPods(defaultTimeout, stsName2, testAntreaIPAMNamespace); err != nil {
t.Fatalf("Error when waiting for StatefulSet Pods to get IPs: %v", err)
}
checkStatefulSetIPPoolAllocation(t, data, stsName2, testAntreaIPAMNamespace, ipPoolName, ipOffsets, reservedIPOffsets)

podName := randName("test-standalone-pod-")
podAnnotations := map[string]string{}
if dedicatedIPPoolKey != nil {
Expand All @@ -349,12 +378,12 @@ func testAntreaIPAMStatefulSet(t *testing.T, data *TestData, dedicatedIPPoolKey
if err != nil {
t.Fatalf("Error when checking IPPoolAllocation: %v", err)
}
startIPString := subnetIPv4RangesMap[testAntreaIPAMNamespace].Spec.IPRanges[0].Start
offset := 2
startIPString = subnetIPv4RangesMap[testAntreaIPAMNamespace].Spec.IPRanges[0].Start
offset = 2
if dedicatedIPPoolKey != nil {
startIPString = subnetIPv4RangesMap[*dedicatedIPPoolKey].Spec.IPRanges[0].Start
}
expectedPodIP := utilnet.AddIPOffset(utilnet.BigForIP(net.ParseIP(startIPString)), offset)
expectedPodIP = utilnet.AddIPOffset(utilnet.BigForIP(net.ParseIP(startIPString)), offset)
assert.True(t, isBelongTo)
assert.True(t, reflect.DeepEqual(ipAddressState, &crdv1alpha2.IPAddressState{
IPAddress: expectedPodIP.String(),
Expand All @@ -368,7 +397,7 @@ func testAntreaIPAMStatefulSet(t *testing.T, data *TestData, dedicatedIPPoolKey
},
}))

ipOffsets = []int32{0, 1, 3}
ipOffsets = []int32{0, 1, 4}
size = len(ipOffsets)
reservedIPOffsets = ipOffsets
_, err = data.updateStatefulSetSize(stsName, testAntreaIPAMNamespace, int32(size))
Expand All @@ -393,6 +422,9 @@ func testAntreaIPAMStatefulSet(t *testing.T, data *TestData, dedicatedIPPoolKey

cleanup()
checkStatefulSetIPPoolAllocation(t, data, stsName, testAntreaIPAMNamespace, ipPoolName, nil, nil)

cleanup2()
checkStatefulSetIPPoolAllocation(t, data, stsName2, testAntreaIPAMNamespace, ipPoolName, nil, nil)
}

func checkStatefulSetIPPoolAllocation(tb testing.TB, data *TestData, name string, namespace string, ipPoolName string, ipOffsets, reservedIPOffsets []int32) {
Expand Down

0 comments on commit 8167259

Please sign in to comment.