diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 931058b819..edee8ac6f2 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -24,6 +24,7 @@ import ( "time" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "google.golang.org/api/googleapi" apiv1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -352,6 +353,19 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error { return utilerrors.NewAggregate(errList) } +func (s *transactionSyncer) isInvalidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool { + apiErr, ok := err.(*googleapi.Error) + if !ok { + return false + } + errCode := apiErr.Code + if errCode == 400 { + s.logger.Info("Detected error when sending endpoint batch information, marking syncer in error state", "operation", operation, "errorCode", errCode) + return true + } + return false +} + // syncNetworkEndpoints spins off go routines to execute NEG operations func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error { syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error { @@ -430,6 +444,9 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone)) } else { s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err)) + if s.isInvalidEPBatch(err, operation, networkEndpoints) { + s.setErrorState() + } } // WARNING: commitTransaction must be called at last for analyzing the operation result diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 9d944c36f3..07374935f3 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -27,6 +27,8 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -1409,6 +1411,48 @@ func TestUnknownNodes(t *testing.T) { } } +func TestIsInvalidEPBatch(t *testing.T) { + fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues()) + fakeCloud := negtypes.NewAdapter(fakeGCE) + zone := "us-central1-a" + networkEndpoints := []*composite.NetworkEndpoint{} + + testCases := []struct { + desc string + HttpStatusCode int + expect bool + }{ + { + desc: "NEG API call no error, status code 200", + HttpStatusCode: 200, + expect: false, + }, + { + desc: "NEG API call error, status code 400", + HttpStatusCode: 400, + expect: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + mockGCE := fakeGCE.Compute().(*cloud.MockGCE) + mockGCE.MockNetworkEndpointGroups.AttachNetworkEndpointsHook = func(ctx context2.Context, key *meta.Key, arg0 *compute.NetworkEndpointGroupsAttachEndpointsRequest, neg *cloud.MockNetworkEndpointGroups) error { + return &googleapi.Error{ + Code: tc.HttpStatusCode, + } + } + _, transactionSyncer := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false, true) + + err := transactionSyncer.cloud.AttachNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, networkEndpoints, transactionSyncer.NegSyncerKey.GetAPIVersion()) + if got := transactionSyncer.isInvalidEPBatch(err, attachOp, networkEndpoints); got != tc.expect { + t.Errorf("isInvalidEPBatch() = %t, expected %t", got, tc.expect) + } + }) + } + +} + func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode, enableEndpointSlices bool) (negtypes.NegSyncer, *transactionSyncer) { negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false, enableEndpointSlices) ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO())