Skip to content

Commit

Permalink
Create e2e test for degraded mode
Browse files Browse the repository at this point in the history
Create e2e test for degraded mode. When an endpointslice contains
endpoint with missing or empty nodeName, this should be filled during
degraded mode. When an endpointslice contains endpoint with invalid pod
information, or the endpoint has an IP that doesn't correspond to any
podIP, this endpoint should be filtered out.
  • Loading branch information
sawsa307 committed May 18, 2023
1 parent 0dab8fd commit 2762336
Show file tree
Hide file tree
Showing 6 changed files with 436 additions and 5 deletions.
197 changes: 196 additions & 1 deletion cmd/e2e-test/neg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (

apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/e2e"
"k8s.io/ingress-gce/pkg/fuzz"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/ingress-gce/pkg/utils/common"
)

func TestNEG(t *testing.T) {
Expand All @@ -47,7 +49,7 @@ func testNEGOS(t *testing.T, os e2e.OS) {
serviceName1 = "neg-service1"
serviceName2 = "neg-service2"
ingressName = "neg-ingress1"
replicas = int32(2)
replicas = 2
)
port80 := networkingv1.ServiceBackendPort{Number: 80}

Expand Down Expand Up @@ -875,3 +877,196 @@ func TestLabelPropagation(t *testing.T) {
})
}
}

func TestDegradedMode(t *testing.T) {
t.Parallel()

const (
customEPSName = "custom-endpointslice"
replicas = 2
endpointAttach = 2
replicasScaleDown = 1
)

ctx := context.Background()
Framework.RunWithSandbox("Degraded Mode", t, func(t *testing.T, s *e2e.Sandbox) {
t.Parallel()
// Create a CustomEPS deployment.
// It has labels not matched to the service,
// so it won't be used by the service endpoint slice.
if err := e2e.EnsureEchoDeployment(s, customEPSName, endpointAttach, e2e.NoopModify); err != nil {
t.Fatalf("Error: e2e.EnsureEchoDeployment(%s, %d) = %v, want nil", customEPSName, endpointAttach, err)
}
if err := e2e.WaitForEchoDeploymentStable(s, customEPSName); err != nil {
t.Fatalf("Echo deployment failed to become stable: %v", err)
}

// Get pods from the CustomEPS deployment.
// EnsureEchoDeployment() creates pods with label
// ["app"] = DeploymentName
podList, err := e2e.ListPods(s)
if err != nil {
t.Fatalf("Error: e2e.ListPods() = %v, want nil", err)
}
var pods []v1.Pod
var podIPs []string
for _, pod := range podList.Items {
if pod.ObjectMeta.Labels["app"] != customEPSName {
continue
}
pods = append(pods, pod)
podIPs = append(podIPs, pod.Status.PodIP)
}

for _, tc := range []struct {
desc string
// modify makes changes to the custom endpoint slice to trigger
// error state
modify func(endpointslice *discoveryv1.EndpointSlice)
// epInNEGFromCustomEPS are endpoints IPs expected to be in
// the NEG after endpoints in the custom endpoint slice are
// successfully attached.
// This is dependent on the behavior of modify()
epInNEGFromCustomEPS []string
}{
{
desc: "create a custom endpoint slice with one endpoint with missing nodeName",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].NodeName = nil
},
epInNEGFromCustomEPS: podIPs, // Endpoint's NodeName will be corrected and attached.
},
{
desc: "create a custom endpoint slice with one endpoint corresponds to non-existent node",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
nonExistentNode := "foo"
endpointslice.Endpoints[0].NodeName = &nonExistentNode
},
epInNEGFromCustomEPS: podIPs, // Endpoint's NodeName will be corrected and attached.
},
{
desc: "create a custom endpoint slice with one endpoint is missing pod information",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].TargetRef = nil
},
epInNEGFromCustomEPS: podIPs[1:], // Endpoints without pod information won't be attached.
},
{
desc: "create a custom endpoint slice with one endpoint doesn't correspond to a existing pod",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].TargetRef.Name = "foo"
},
epInNEGFromCustomEPS: podIPs[1:], // Endpoints from non-existing pod won't be attached.
},
{
desc: "create a custom endpoint slice with one endpoint has IP doesn't correspond to any PodIP",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].Addresses[0] = "1.1.1.1"
},
epInNEGFromCustomEPS: podIPs[1:], // Endpoints with IPs not matched to pods won't be attached.
},
} {
tc := tc
svcName := fmt.Sprintf("service-%s", common.ContentHash(tc.desc, 8))
t.Run(tc.desc, func(t *testing.T) {
// Setup Standalone NEG.
annotation := annotations.NegAnnotation{
Ingress: false,
ExposedPorts: map[int32]annotations.NegAttributes{
int32(443): {},
int32(80): {},
},
}
svcAnnotations := map[string]string{annotations.NEGAnnotationKey: annotation.String()}
svc, err := e2e.EnsureEchoService(s, svcName, svcAnnotations, v1.ServiceTypeClusterIP, replicas)
if err != nil {
t.Fatalf("Error ensuring echo service: %v", err)
}
t.Logf("Echo service ensured (%s/%s).\n", s.Namespace, svcName)
if err := e2e.WaitForEchoDeploymentStable(s, svcName); err != nil {
t.Fatalf("Echo deployment failed to become stable: %v", err)
}

// Get podIPs from service deployment.
podList, err := e2e.ListPods(s)
if err != nil {
t.Fatalf("Error: e2e.ListPods() = %v, want nil", err)
}
var epInNEGFromSvcDeployment []string
for _, pod := range podList.Items {
if pod.ObjectMeta.Labels["app"] != svcName {
continue
}
epInNEGFromSvcDeployment = append(epInNEGFromSvcDeployment, pod.Status.PodIP)
}

// Verify endpoint IPs before attaching custom endpoint slice.
exposedPorts := sets.NewString("80", "443")
negStatus, err := e2e.WaitForNegStatus(s, svcName, exposedPorts.List(), false)
if err != nil {
t.Fatalf("Error waiting for NEG status to update: %v", err)
}
for port, negName := range negStatus.NetworkEndpointGroups {
err = e2e.CheckEndpointsInNEG(ctx, Framework.Cloud, negName, negStatus.Zones, epInNEGFromSvcDeployment)
if err != nil {
t.Fatalf("Error: e2e.CheckEndpointsInNEG service %s/%s neg port/name %s/%s: %v", svcName, s.Namespace, port, negName, err)
}
}

// Create and add a custom endpoint slice.
// modify() injects invalid information to the endpoint slice,
// which would trigger error state.
// Valid endpoints will be attached, and invalid endpoints will be
// attached or ignored based on the type of invalid information.
customEPS, err := e2e.EnsureCustomEndpointSlice(s, svc, customEPSName, pods, tc.modify)
if err != nil {
t.Fatalf("Error: e2e.EnsureCustomEndpointSlice() = %v, want nil", err)
}
t.Logf("Create custom endpoint slice: %v.\n", customEPS)

// Verify endpoint IPs after the custom endpoint slice is added.
expectEndpoints := append(tc.epInNEGFromCustomEPS, epInNEGFromSvcDeployment...)
negStatus, err = e2e.WaitForNegStatus(s, svcName, exposedPorts.List(), false)
if err != nil {
t.Fatalf("Error waiting for NEG status to update: %v", err)
}
for port, negName := range negStatus.NetworkEndpointGroups {
err = e2e.CheckEndpointsInNEG(ctx, Framework.Cloud, negName, negStatus.Zones, expectEndpoints)
if err != nil {
t.Fatalf("Error: e2e.CheckEndpointsInNEG service %s/%s neg port/name %s/%s: %v", svcName, s.Namespace, port, negName, err)
}
}

// Scale down service deployment to verify if the controller
// still works properly and detach removed endpoints.
if err := e2e.EnsureEchoDeployment(s, svcName, replicasScaleDown, func(deployment *apps.Deployment) {}); err != nil {
t.Fatalf("Error ensuring echo deployment: %v.", err)
}
if err := e2e.WaitForEchoDeploymentStable(s, svcName); err != nil {
t.Fatalf("Echo deployment failed to become stable: %v.", err)
}
// Get updated podIPs from service deployment.
podList, err = e2e.ListPods(s)
if err != nil {
t.Fatalf("Error: e2e.ListPods() = %v, want nil", err)
}
var epInNEGFromSvcDeploymentAfterScaleDown []string
for _, pod := range podList.Items {
if pod.ObjectMeta.Labels["app"] != svcName {
continue
}
epInNEGFromSvcDeploymentAfterScaleDown = append(epInNEGFromSvcDeploymentAfterScaleDown, pod.Status.PodIP)
}

// Verify endpoint IPs after service deployment is scaled down.
expectEndpointsAfterScaleDown := append(epInNEGFromSvcDeploymentAfterScaleDown, tc.epInNEGFromCustomEPS...)
for port, negName := range negStatus.NetworkEndpointGroups {
err = e2e.CheckEndpointsInNEG(ctx, Framework.Cloud, negName, negStatus.Zones, expectEndpointsAfterScaleDown)
if err != nil {
t.Fatalf("Error: e2e.CheckEndpointsInNEG service %s/%s neg port/name %s/%s: %v", svcName, s.Namespace, port, negName, err)
}
}
})
}
})
}
2 changes: 1 addition & 1 deletion pkg/e2e/adapter/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

// IngressCRUD wraps basic CRUD to allow use of old and new APIs.
type IngressCRUD struct {
C *kubernetes.Clientset
C kubernetes.Interface
}

// Get Ingress resource.
Expand Down
68 changes: 66 additions & 2 deletions pkg/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"google.golang.org/api/compute/v1"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -142,8 +143,9 @@ func ensureEchoService(s *Sandbox, name string, annotations map[string]string, s
TargetPort: intstr.FromInt(8443),
},
},
Selector: map[string]string{"app": name},
Type: svcType,
Selector: map[string]string{"app": name},
Type: svcType,
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
}
svc, err := s.f.Clientset.CoreV1().Services(s.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
Expand Down Expand Up @@ -576,3 +578,65 @@ func EnsureServiceAttachment(s *Sandbox, saName, svcName, subnetName string) (*s
func DeleteServiceAttachment(s *Sandbox, saName string) error {
return s.f.SAClient.Delete(s.Namespace, saName)
}

// EnsureCustomEndpointSlice ensures that a custom endpoint slice with the
// given modification is set up. The endpoint slice uses the given list of pods
// as endpoints.
func EnsureCustomEndpointSlice(s *Sandbox, svc *v1.Service, name string, pods []v1.Pod, modify func(endpointslice *discoveryv1.EndpointSlice)) (*discoveryv1.EndpointSlice, error) {
endpointSlice := &discoveryv1.EndpointSlice{
AddressType: discoveryv1.AddressType(svc.Spec.IPFamilies[0]),
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: s.Namespace,
Labels: map[string]string{
discoveryv1.LabelServiceName: svc.Name,
discoveryv1.LabelManagedBy: "foo", // This is a custom endpoint slice.
},
},
}
for _, port := range svc.Spec.Ports {
port := port
endpointPort := discoveryv1.EndpointPort{
Name: &port.Name,
Port: &port.TargetPort.IntVal,
Protocol: &port.Protocol,
}
endpointSlice.Ports = append(endpointSlice.Ports, endpointPort)
}

for _, pod := range pods {
endpoint := discoveryv1.Endpoint{
Addresses: []string{pod.Status.PodIP},
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: s.Namespace,
Name: pod.Name,
UID: pod.ObjectMeta.UID,
},
}
endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpoint)
}

modify(endpointSlice)
// Create the custom endpoint slice if it is doesn't exist.
existingEPS, err := s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Get(context.TODO(), endpointSlice.ObjectMeta.Name, metav1.GetOptions{})
if err != nil || existingEPS == nil {
return s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
}

// Address type should immutable after creation.
if existingEPS.AddressType != endpointSlice.AddressType {
return nil, fmt.Errorf("endpointSlice %s:%s addressType cannot be modified (existing=%q, new=%q)", existingEPS.Namespace, existingEPS.Name, existingEPS.AddressType, endpointSlice.AddressType)
}
if !reflect.DeepEqual(existingEPS.Endpoints, endpointSlice.Endpoints) ||
!reflect.DeepEqual(existingEPS.Ports, endpointSlice.Ports) {
return s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
}
return existingEPS, nil
}

// ListPods lists all pods in the sandbox namespace.
func ListPods(s *Sandbox) (*v1.PodList, error) {
return s.f.Clientset.CoreV1().Pods(s.Namespace).List(context.TODO(), metav1.ListOptions{})
}
Loading

0 comments on commit 2762336

Please sign in to comment.