Skip to content

Commit

Permalink
Add check for attach/detach API error
Browse files Browse the repository at this point in the history
Add check to make sure attach/detach endpoint operations are not failed
due to invalid endpoint batch information. If so, the syncer will enter
the error state.
  • Loading branch information
sawsa307 committed Nov 23, 2022
1 parent e776188 commit b467552
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
19 changes: 18 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"google.golang.org/api/googleapi"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
Expand Down Expand Up @@ -372,7 +373,20 @@ func (s *transactionSyncer) invalidEndpointInfo(eds []negtypes.EndpointsData, en
countFromEndpointData += len(ed.Addresses)
}
if countFromEndpointData != countFromPodMap {
s.logger.Info("Detected error when comparing endpoint counts, marking syncer in error state", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
s.logger.Info("Detected error when comparing endpoint counts", "endpointData", eds, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return true
}
return false
}

func (s *transactionSyncer) isInvalidEPBatch(err error, operation transactionOp, networkEndpoints []*composite.NetworkEndpoint) bool {
apiErr, ok := err.(*googleapi.Error)
if !ok {
return false
}
errCode := apiErr.Code
if errCode == 400 {
s.logger.Info("Detected error when sending endpoint batch information, marking syncer in error state", "operation", operation, "errorCode", errCode)
return true
}
return false
Expand Down Expand Up @@ -456,6 +470,9 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri
s.recordEvent(apiv1.EventTypeNormal, operation.String(), fmt.Sprintf("%s %d network endpoint(s) (NEG %q in zone %q)", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone))
} else {
s.recordEvent(apiv1.EventTypeWarning, operation.String()+"Failed", fmt.Sprintf("Failed to %s %d network endpoint(s) (NEG %q in zone %q): %v", operation.String(), len(networkEndpointMap), s.NegSyncerKey.NegName, zone, err))
if s.isInvalidEPBatch(err, operation, networkEndpoints) {
s.setErrorState()
}
}

// WARNING: commitTransaction must be called at last for analyzing the operation result
Expand Down
44 changes: 44 additions & 0 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -1765,6 +1767,48 @@ func TestInvalidEndpointInfoEndpointCountsDiffer(t *testing.T) {
}
}

func TestIsInvalidEPBatch(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
fakeCloud := negtypes.NewAdapter(fakeGCE)
zone := "us-central1-a"
networkEndpoints := []*composite.NetworkEndpoint{}

testCases := []struct {
desc string
HttpStatusCode int
expect bool
}{
{
desc: "NEG API call no error, status code 200",
HttpStatusCode: 200,
expect: false,
},
{
desc: "NEG API call error, status code 400",
HttpStatusCode: 400,
expect: true,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
mockGCE := fakeGCE.Compute().(*cloud.MockGCE)
mockGCE.MockNetworkEndpointGroups.AttachNetworkEndpointsHook = func(ctx context2.Context, key *meta.Key, arg0 *compute.NetworkEndpointGroupsAttachEndpointsRequest, neg *cloud.MockNetworkEndpointGroups) error {
return &googleapi.Error{
Code: tc.HttpStatusCode,
}
}
_, transactionSyncer := newTestTransactionSyncer(fakeCloud, negtypes.VmIpPortEndpointType, false, true)

err := transactionSyncer.cloud.AttachNetworkEndpoints(transactionSyncer.NegSyncerKey.NegName, zone, networkEndpoints, transactionSyncer.NegSyncerKey.GetAPIVersion())
if got := transactionSyncer.isInvalidEPBatch(err, attachOp, networkEndpoints); got != tc.expect {
t.Errorf("isInvalidEPBatch() = %t, expected %t", got, tc.expect)
}
})
}

}

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode, enableEndpointSlices bool) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false, enableEndpointSlices)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO())
Expand Down

0 comments on commit b467552

Please sign in to comment.