Skip to content

Commit

Permalink
Create e2e test for degraded mode
Browse files Browse the repository at this point in the history
Create e2e test for degraded mode. When an endpointslice contains
endpoint with missing or empty nodeName, this should be filled during
degraded mode. When an endpointslice contains endpoint with invalid pod
information, or the endpoint has an IP that doesn't correspond to any
podIP, this endpoint should be filtered out.
  • Loading branch information
sawsa307 committed Apr 12, 2023
1 parent 034ad3f commit 04856fe
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 0 deletions.
135 changes: 135 additions & 0 deletions cmd/e2e-test/neg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/annotations"
Expand Down Expand Up @@ -778,3 +779,137 @@ func TestNegDisruptive(t *testing.T) {
}
})
}

func TestDegradedMode(t *testing.T) {
t.Parallel()

const (
epsName = "custom-endpointslice"
svcName = "service-1"
customEPSManagedBy = "foo"
replicas = int32(2)
newReplicas = int32(1)
)

for _, tc := range []struct {
desc string
modify func(endpointslice *discoveryv1.EndpointSlice)
expectedCount int
}{
{
desc: "create a custom endpoint slice with 2 endpoints, one endpoint with missing nodeName",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].NodeName = nil
},
expectedCount: 4,
},
{
desc: "create a custom endpoint slice with 2 endpoints, one endpoint corresponds to non-existent node",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
nonExistentNode := "foo"
endpointslice.Endpoints[0].NodeName = &nonExistentNode
},
expectedCount: 4,
},
{
desc: "create a custom endpoint slice with 3 endpoints, one endpoint is missing pod information",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].TargetRef = nil
},
expectedCount: 3,
},
{
desc: "create a custom endpoint slice with 2 endpoints, one endpoint doesn't correspond to a existing pod",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].TargetRef.Name = "foo"
},
expectedCount: 3,
},
{
desc: "create a custom endpoint slice with 2 endpoints, one endpoint has IP doesn't correspond to any PodIP",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].Addresses[0] = "1.1.1.1"
},
expectedCount: 3,
},
} {
tc := tc // Capture tc as we are running this in parallel.
Framework.RunWithSandbox(tc.desc, t, func(t *testing.T, s *e2e.Sandbox) {
t.Parallel()
ctx := context.Background()

annotation := annotations.NegAnnotation{
Ingress: false,
ExposedPorts: map[int32]annotations.NegAttributes{
int32(443): {},
int32(80): {},
},
}
svcAnnotations := map[string]string{annotations.NEGAnnotationKey: annotation.String()}
svc, err := e2e.EnsureEchoService(s, svcName, svcAnnotations, v1.ServiceTypeClusterIP, replicas)
if err != nil {
t.Fatalf("error ensuring echo service: %v", err)
}
t.Logf("Echo service ensured (%s/%s)", s.Namespace, svcName)

if err := e2e.WaitForEchoDeploymentStable(s, svcName); err != nil {
t.Fatalf("Echo deployment failed to become stable: %v", err)
}

exposedPorts := sets.NewString("80", "443")
// validate neg status
negStatus, err := e2e.WaitForNegStatus(s, svcName, exposedPorts.List(), false)
if err != nil {
t.Fatalf("error waiting for NEG status to update: %v", err)
}
for port, negName := range negStatus.NetworkEndpointGroups {
err := e2e.WaitForNegs(ctx, Framework.Cloud, negName, negStatus.Zones, false, int(replicas))
if err != nil {
t.Fatalf("Error: e2e.WaitForNetworkEndpoints(%s, %s, %v, %d) = %v, want nil", port, negName, negStatus.Zones, int(replicas), err)
}
}

customEPS, err := e2e.EnsureEndpointSlice(s, svc, epsName, customEPSManagedBy, int(replicas), tc.modify)
if err != nil {
t.Fatalf("Error: e2e.EnsureEndpointSlice() = %v, want nil", err)
}
t.Logf("Create custom endpoint slice: %v", customEPS)

_, err = e2e.WaitForEndpointSlice(s, customEPS.ObjectMeta.Name)
if err != nil {
t.Fatalf("Error: e2e.WaitForEndpointSlice() = %v, want nil", err)
}

negStatus, err = e2e.WaitForNegStatus(s, svcName, exposedPorts.List(), false)
if err != nil {
t.Fatalf("error waiting for NEG status to update: %v", err)
}
for port, negName := range negStatus.NetworkEndpointGroups {
err := e2e.WaitForNegs(ctx, Framework.Cloud, negName, negStatus.Zones, false, tc.expectedCount)
if err != nil {
t.Fatalf("Error: e2e.WaitForNetworkEndpoints(%s, %s, %v, %d) = %v, want nil", port, negName, negStatus.Zones, tc.expectedCount, err)
}
}

// scale down to ensure the controller still works properly
if err := e2e.EnsureEchoDeployment(s, svcName, newReplicas, func(deployment *apps.Deployment) {}); err != nil {
t.Fatalf("error ensuring echo deployment: %v", err)
}
if err := e2e.WaitForEchoDeploymentStable(s, svcName); err != nil {
t.Fatalf("Echo deployment failed to become stable: %v", err)
}

negStatus, err = e2e.WaitForNegStatus(s, svcName, exposedPorts.List(), false)
if err != nil {
t.Fatalf("error waiting for NEG status to update: %v", err)
}
newExpectedCount := tc.expectedCount - int(replicas) + int(newReplicas)
for port, negName := range negStatus.NetworkEndpointGroups {
err := e2e.WaitForNegs(ctx, Framework.Cloud, negName, negStatus.Zones, false, newExpectedCount)
if err != nil {
t.Fatalf("Error: e2e.WaitForNetworkEndpoints(%s, %s, %v, %d) = %v, want nil", port, negName, negStatus.Zones, int(replicas), err)
}
}
})
}
}
130 changes: 130 additions & 0 deletions pkg/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ import (
frontendconfig "k8s.io/ingress-gce/pkg/apis/frontendconfig/v1beta1"
sav1 "k8s.io/ingress-gce/pkg/apis/serviceattachment/v1"
"k8s.io/ingress-gce/pkg/e2e/adapter"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/ingress-gce/pkg/utils"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
computebeta "google.golang.org/api/compute/v0.beta"
"google.golang.org/api/compute/v1"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -564,3 +566,131 @@ func EnsureServiceAttachment(s *Sandbox, saName, svcName, subnetName string) (*s
func DeleteServiceAttachment(s *Sandbox, saName string) error {
return s.f.SAClient.Delete(s.Namespace, saName)
}

func EnsureEndpointSlice(s *Sandbox, svc *v1.Service, EPSName, managedByLabel string, endpointCount int, modify func(endpointslice *discoveryv1.EndpointSlice)) (*discoveryv1.EndpointSlice, error) {
tcp := v1.ProtocolTCP
endpointSlice := &discoveryv1.EndpointSlice{
AddressType: "IPv4",
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name + "-" + EPSName,
Namespace: s.Namespace,
Labels: map[string]string{
discoveryv1.LabelServiceName: svc.Name,
discoveryv1.LabelManagedBy: managedByLabel,
},
},
Ports: []discoveryv1.EndpointPort{
{
Name: &svc.Spec.Ports[0].Name,
Port: &svc.Spec.Ports[0].TargetPort.IntVal,
Protocol: &tcp,
},
{
Name: &svc.Spec.Ports[1].Name,
Port: &svc.Spec.Ports[1].TargetPort.IntVal,
Protocol: &tcp,
},
},
Endpoints: []discoveryv1.Endpoint{},
}

for i := 1; i <= endpointCount; i++ {
podName := fmt.Sprintf("pod-%d", i)
pod, err := EnsurePod(s, podName)
if err != nil || pod == nil {
return nil, fmt.Errorf("Error when EnsurePod(%s)", podName)
}
pod, err = WaitForPodStable(s, podName)
if err != nil || pod == nil {
return nil, fmt.Errorf("Error when WaitForPodStable(%s)", podName)
}
var node *v1.Node
node, err = s.f.Clientset.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
klog.Infof("Pod %s/%s created on node %s, ip: %s, PodCIDR: %v", s.Namespace, podName, node.ObjectMeta.Name, pod.Status.PodIP, node.Spec.PodCIDRs)

endpoint := discoveryv1.Endpoint{
Addresses: []string{pod.Status.PodIP},
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: s.Namespace,
Name: pod.Name,
UID: pod.ObjectMeta.UID,
},
}
endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpoint)
}

modify(endpointSlice)
existingEPS, err := s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Get(context.TODO(), endpointSlice.ObjectMeta.Name, metav1.GetOptions{})
if err != nil || existingEPS == nil {
return s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
}
if !reflect.DeepEqual(existingEPS.Endpoints, endpointSlice.Endpoints) ||
!reflect.DeepEqual(existingEPS.Ports, endpointSlice.Ports) {
return s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
}
return existingEPS, nil
}

func EnsurePod(s *Sandbox, name string) (*v1.Pod, error) {
podSpec := v1.PodSpec{
ReadinessGates: []v1.PodReadinessGate{{ConditionType: shared.NegReadinessGate}},
NodeSelector: map[string]string{"kubernetes.io/os": "linux"},
Containers: []v1.Container{
{
Name: "echoheaders",
Image: echoheadersImage,
Ports: []v1.ContainerPort{
{ContainerPort: 8080, Name: "http-port"},
{ContainerPort: 8443, Name: "https-port"},
},
Env: []v1.EnvVar{
{
Name: app.HostEnvVar,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
{
Name: app.PodEnvVar,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
{
Name: app.NamespaceEnvVar,
ValueFrom: &v1.EnvVarSource{
FieldRef: &v1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
},
},
},
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: s.Namespace,
Name: name,
Labels: map[string]string{
"app": "foo",
},
},
Spec: podSpec,
}

currentPod, err := s.f.Clientset.CoreV1().Pods(s.Namespace).Get(context.TODO(), pod.ObjectMeta.Name, metav1.GetOptions{})
if err != nil || currentPod == nil {
return s.f.Clientset.CoreV1().Pods(s.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
}
if !reflect.DeepEqual(currentPod.ObjectMeta, pod.ObjectMeta) {
return s.f.Clientset.CoreV1().Pods(s.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
}
return currentPod, err
}
43 changes: 43 additions & 0 deletions pkg/e2e/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -1104,3 +1105,45 @@ func Truncate(key string) string {
}
return key
}

func WaitForEndpointSlice(s *Sandbox, name string) (*discoveryv1.EndpointSlice, error) {
var currentEPS *discoveryv1.EndpointSlice
returnErr := wait.Poll(k8sApiPoolInterval, k8sApiPollTimeout, func() (bool, error) {
var err error
currentEPS, err = s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
if currentEPS == nil || err != nil {
return false, fmt.Errorf("failed to get endpoint slice %s/%s: %v", s.Namespace, name, err)
}
return true, nil
})

return currentEPS, returnErr
}

// WaitForEchoDeploymentStable waits until the deployment's readyReplicas, availableReplicas and updatedReplicas are equal to replicas.
func WaitForPodStable(s *Sandbox, name string) (*v1.Pod, error) {
var pod *v1.Pod
returnErr := wait.Poll(k8sApiPoolInterval, k8sApiPollTimeout, func() (bool, error) {
var err error
pod, err = s.f.Clientset.CoreV1().Pods(s.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
if pod == nil || err != nil {
return false, fmt.Errorf("failed to get pod %s/%s: %v", s.Namespace, name, err)
}
if err := CheckPod(pod); err != nil {
klog.Infof("WaitForPodStable(%s/%s) = %v", s.Namespace, name, err)
return false, nil
}
return true, nil
})
return pod, returnErr
}

func CheckPod(pod *v1.Pod) error {
if pod.Status.PodIP == "" {
return fmt.Errorf("pod not ready: no ip")
}
if pod.Status.Phase == v1.PodPending {
return fmt.Errorf("pod not ready: phase pending")
}
return nil
}

0 comments on commit 04856fe

Please sign in to comment.