Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Degraded mode e2e #2066

Merged
merged 1 commit into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 198 additions & 1 deletion cmd/e2e-test/neg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (

apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/e2e"
"k8s.io/ingress-gce/pkg/fuzz"
"k8s.io/ingress-gce/pkg/neg/types/shared"
"k8s.io/ingress-gce/pkg/utils/common"
)

func TestNEG(t *testing.T) {
Expand All @@ -47,7 +49,7 @@ func testNEGOS(t *testing.T, os e2e.OS) {
serviceName1 = "neg-service1"
serviceName2 = "neg-service2"
ingressName = "neg-ingress1"
replicas = int32(2)
replicas = 2
)
port80 := networkingv1.ServiceBackendPort{Number: 80}

Expand Down Expand Up @@ -875,3 +877,198 @@ func TestLabelPropagation(t *testing.T) {
})
}
}

func TestDegradedMode(t *testing.T) {
t.Parallel()

const (
customEPSName = "custom-endpointslice"
replicas = 2
endpointAttach = 2
replicasScaleDown = 1
)

ctx := context.Background()
Framework.RunWithSandbox("Degraded Mode", t, func(t *testing.T, s *e2e.Sandbox) {
t.Parallel()
// Create a CustomEPS deployment.
// It has labels not matched to the service,
// so it won't be used by the service endpoint slice.
if err := e2e.EnsureEchoDeployment(s, customEPSName, endpointAttach, e2e.NoopModify); err != nil {
t.Fatalf("Error: e2e.EnsureEchoDeployment(%s, %d) = %v, want nil", customEPSName, endpointAttach, err)
}
if err := e2e.WaitForEchoDeploymentStable(s, customEPSName); err != nil {
t.Fatalf("Echo deployment failed to become stable: %v", err)
}

// Get pods from the CustomEPS deployment.
// EnsureEchoDeployment() creates pods with label
// ["app"] = DeploymentName
podList, err := e2e.ListPods(s)
if err != nil {
t.Fatalf("Error: e2e.ListPods() = %v, want nil", err)
}
var pods []v1.Pod
var podIPs []string
for _, pod := range podList.Items {
if pod.ObjectMeta.Labels["app"] != customEPSName {
continue
}
pods = append(pods, pod)
podIPs = append(podIPs, pod.Status.PodIP)
}

for _, tc := range []struct {
desc string
// modify makes changes to the custom endpoint slice to trigger
// error state
modify func(endpointslice *discoveryv1.EndpointSlice)
// epInNEGFromCustomEPS are endpoints IPs expected to be in
// the NEG after endpoints in the custom endpoint slice are
// successfully attached.
// This is dependent on the behavior of modify()
epInNEGFromCustomEPS []string
}{
{
desc: "create a custom endpoint slice with one endpoint with missing nodeName",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].NodeName = nil
},
epInNEGFromCustomEPS: podIPs, // Endpoint's NodeName will be corrected and attached.
},
{
desc: "create a custom endpoint slice with one endpoint corresponds to non-existent node",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
nonExistentNode := "foo"
endpointslice.Endpoints[0].NodeName = &nonExistentNode
},
epInNEGFromCustomEPS: podIPs, // Endpoint's NodeName will be corrected and attached.
},
{
desc: "create a custom endpoint slice with one endpoint is missing pod information",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].TargetRef = nil
},
epInNEGFromCustomEPS: podIPs[1:], // Endpoints without pod information won't be attached.
},
{
desc: "create a custom endpoint slice with one endpoint doesn't correspond to a existing pod",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].TargetRef.Name = "foo"
},
epInNEGFromCustomEPS: podIPs[1:], // Endpoints from non-existing pod won't be attached.
},
{
desc: "create a custom endpoint slice with one endpoint has IP doesn't correspond to any PodIP",
modify: func(endpointslice *discoveryv1.EndpointSlice) {
endpointslice.Endpoints[0].Addresses[0] = "1.1.1.1"
},
epInNEGFromCustomEPS: podIPs[1:], // Endpoints with IPs not matched to pods won't be attached.
},
} {
tc := tc
svcName := fmt.Sprintf("service-%s", common.ContentHash(tc.desc, 8))
t.Run(tc.desc, func(t *testing.T) {
// Setup Standalone NEG.
annotation := annotations.NegAnnotation{
Ingress: false,
ExposedPorts: map[int32]annotations.NegAttributes{
int32(443): {},
int32(80): {},
},
}
svcAnnotations := map[string]string{annotations.NEGAnnotationKey: annotation.String()}
svc, err := e2e.EnsureEchoService(s, svcName, svcAnnotations, v1.ServiceTypeClusterIP, replicas)
if err != nil {
t.Fatalf("Error ensuring echo service: %v", err)
}
t.Logf("Echo service ensured (%s/%s).\n", s.Namespace, svcName)
if err := e2e.WaitForEchoDeploymentStable(s, svcName); err != nil {
t.Fatalf("Echo deployment failed to become stable: %v", err)
}

// Verify endpoint IPs by ensuring NEG endpoints are from
// the service deployment.
podList, err := e2e.ListPods(s)
if err != nil {
t.Fatalf("Error: e2e.ListPods() = %v, want nil", err)
}
var epInNEGFromSvcDeployment []string
// Only select podIPs from service deployment pods.
for _, pod := range podList.Items {
if pod.ObjectMeta.Labels["app"] != svcName {
swetharepakula marked this conversation as resolved.
Show resolved Hide resolved
continue
}
epInNEGFromSvcDeployment = append(epInNEGFromSvcDeployment, pod.Status.PodIP)
}

// Verify endpoint IPs before attaching custom endpoint slice.
exposedPorts := sets.NewString("80", "443")
negStatus, err := e2e.WaitForNegStatus(s, svcName, exposedPorts.List(), false)
if err != nil {
t.Fatalf("Error waiting for NEG status to update: %v", err)
}
for port, negName := range negStatus.NetworkEndpointGroups {
err = e2e.CheckNEGEndpointIPs(ctx, Framework.Cloud, negName, negStatus.Zones, epInNEGFromSvcDeployment)
if err != nil {
t.Fatalf("Error: e2e.CheckNEGEndpointIPs service %s/%s neg port/name %s/%s: %v", svcName, s.Namespace, port, negName, err)
}
}

// Create and add a custom endpoint slice.
// modify() injects invalid information to the endpoint slice,
// which would trigger error state.
// Valid endpoints will be attached, and invalid endpoints will be
// attached or ignored based on the type of invalid information.
customEPS, err := e2e.EnsureCustomEndpointSlice(s, svc, customEPSName, pods, tc.modify)
if err != nil {
t.Fatalf("Error: e2e.EnsureCustomEndpointSlice() = %v, want nil", err)
}
t.Logf("Create custom endpoint slice: %v.\n", customEPS)

// Verify endpoint IPs after the custom endpoint slice is added.
expectEndpoints := append(tc.epInNEGFromCustomEPS, epInNEGFromSvcDeployment...)
negStatus, err = e2e.WaitForNegStatus(s, svcName, exposedPorts.List(), false)
if err != nil {
t.Fatalf("Error waiting for NEG status to update: %v", err)
}
for port, negName := range negStatus.NetworkEndpointGroups {
err = e2e.CheckNEGEndpointIPs(ctx, Framework.Cloud, negName, negStatus.Zones, expectEndpoints)
if err != nil {
t.Fatalf("Error: e2e.CheckNEGEndpointIPs service %s/%s neg port/name %s/%s: %v", svcName, s.Namespace, port, negName, err)
}
}

// Scale down service deployment to verify if the controller
// still works properly and detach removed endpoints.
if err := e2e.EnsureEchoDeployment(s, svcName, replicasScaleDown, func(deployment *apps.Deployment) {}); err != nil {
t.Fatalf("Error ensuring echo deployment: %v", err)
}
if err := e2e.WaitForEchoDeploymentStable(s, svcName); err != nil {
t.Fatalf("Echo deployment failed to become stable: %v", err)
}
// Get updated podIPs from service deployment.
podList, err = e2e.ListPods(s)
if err != nil {
t.Fatalf("Error: e2e.ListPods() = %v, want nil", err)
}
var epInNEGFromSvcDeploymentAfterScaleDown []string
for _, pod := range podList.Items {
if pod.ObjectMeta.Labels["app"] != svcName {
continue
}
epInNEGFromSvcDeploymentAfterScaleDown = append(epInNEGFromSvcDeploymentAfterScaleDown, pod.Status.PodIP)
}

// Verify endpoint IPs after service deployment is scaled down.
expectEndpointsAfterScaleDown := append(epInNEGFromSvcDeploymentAfterScaleDown, tc.epInNEGFromCustomEPS...)
for port, negName := range negStatus.NetworkEndpointGroups {
err = e2e.CheckNEGEndpointIPs(ctx, Framework.Cloud, negName, negStatus.Zones, expectEndpointsAfterScaleDown)
if err != nil {
t.Fatalf("Error: e2e.CheckNEGEndpointIPs service %s/%s neg port/name %s/%s: %v", svcName, s.Namespace, port, negName, err)
}
}
})
}
})
}
2 changes: 1 addition & 1 deletion pkg/e2e/adapter/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (

// IngressCRUD wraps basic CRUD to allow use of old and new APIs.
type IngressCRUD struct {
C *kubernetes.Clientset
swetharepakula marked this conversation as resolved.
Show resolved Hide resolved
C kubernetes.Interface
}

// Get Ingress resource.
Expand Down
68 changes: 66 additions & 2 deletions pkg/e2e/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"google.golang.org/api/compute/v1"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -142,8 +143,9 @@ func ensureEchoService(s *Sandbox, name string, annotations map[string]string, s
TargetPort: intstr.FromInt(8443),
},
},
Selector: map[string]string{"app": name},
Type: svcType,
Selector: map[string]string{"app": name},
Type: svcType,
IPFamilies: []v1.IPFamily{v1.IPv4Protocol},
},
}
svc, err := s.f.Clientset.CoreV1().Services(s.Namespace).Get(context.TODO(), name, metav1.GetOptions{})
Expand Down Expand Up @@ -576,3 +578,65 @@ func EnsureServiceAttachment(s *Sandbox, saName, svcName, subnetName string) (*s
func DeleteServiceAttachment(s *Sandbox, saName string) error {
return s.f.SAClient.Delete(s.Namespace, saName)
}

// EnsureCustomEndpointSlice ensures that a custom endpoint slice with the
// given modification is set up. The endpoint slice uses the given list of pods
// as endpoints.
func EnsureCustomEndpointSlice(s *Sandbox, svc *v1.Service, name string, pods []v1.Pod, modify func(endpointslice *discoveryv1.EndpointSlice)) (*discoveryv1.EndpointSlice, error) {
endpointSlice := &discoveryv1.EndpointSlice{
AddressType: discoveryv1.AddressType(svc.Spec.IPFamilies[0]),
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: s.Namespace,
Labels: map[string]string{
discoveryv1.LabelServiceName: svc.Name,
discoveryv1.LabelManagedBy: "foo", // This is a custom endpoint slice.
},
},
}
for _, port := range svc.Spec.Ports {
port := port
endpointPort := discoveryv1.EndpointPort{
Name: &port.Name,
Port: &port.TargetPort.IntVal,
Protocol: &port.Protocol,
}
endpointSlice.Ports = append(endpointSlice.Ports, endpointPort)
}

for _, pod := range pods {
endpoint := discoveryv1.Endpoint{
Addresses: []string{pod.Status.PodIP},
NodeName: &pod.Spec.NodeName,
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: s.Namespace,
Name: pod.Name,
UID: pod.ObjectMeta.UID,
},
}
endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpoint)
}

modify(endpointSlice)
// Create the custom endpoint slice if it is doesn't exist.
existingEPS, err := s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Get(context.TODO(), endpointSlice.ObjectMeta.Name, metav1.GetOptions{})
if err != nil || existingEPS == nil {
return s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
}

// Address type should immutable after creation.
if existingEPS.AddressType != endpointSlice.AddressType {
return nil, fmt.Errorf("endpointSlice %s:%s addressType cannot be modified (existing=%q, new=%q)", existingEPS.Namespace, existingEPS.Name, existingEPS.AddressType, endpointSlice.AddressType)
}
if !reflect.DeepEqual(existingEPS.Endpoints, endpointSlice.Endpoints) ||
!reflect.DeepEqual(existingEPS.Ports, endpointSlice.Ports) {
return s.f.Clientset.DiscoveryV1().EndpointSlices(s.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
}
return existingEPS, nil
}

// ListPods lists all pods in the sandbox namespace.
func ListPods(s *Sandbox) (*v1.PodList, error) {
return s.f.Clientset.CoreV1().Pods(s.Namespace).List(context.TODO(), metav1.ListOptions{})
}
Loading