Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Status Updater Retrying on Failures #1062

Merged
merged 16 commits into from
Sep 27, 2023
16 changes: 16 additions & 0 deletions internal/framework/status/k8s_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package status

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . K8sUpdater

// K8sUpdater updates a resource from the k8s API.
// It allows us to mock the client.Reader.Status.Update method.
type K8sUpdater interface {
// Update is from client.StatusClient.SubResourceWriter.
Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error
}
117 changes: 117 additions & 0 deletions internal/framework/status/statusfakes/fake_k8s_updater.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 83 additions & 27 deletions internal/framework/status/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package status

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/gateway-api/apis/v1beta1"

ngfAPI "github.com/nginxinc/nginx-gateway-fabric/apis/v1alpha1"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/controller"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Updater
Expand Down Expand Up @@ -64,15 +68,11 @@ type UpdaterConfig struct {
// (b) k8s API can become slow or even timeout. This will increase every update status API call.
// Making UpdaterImpl asynchronous will prevent it from adding variable delays to the event loop.
//
// (3) It doesn't retry on failures. This means there is a chance that some resources will not have up-to-do statuses.
// Statuses are important part of the Gateway API, so we need to ensure that the Gateway always keep the resources
// statuses up-to-date.
//
// (4) It doesn't clear the statuses of a resources that are no longer handled by the Gateway. For example, if
// (3) It doesn't clear the statuses of a resources that are no longer handled by the Gateway. For example, if
// an HTTPRoute resource no longer has the parentRef to the Gateway resources, the Gateway must update the status
// of the resource to remove the status about the removed parentRef.
//
// (5) If another controllers changes the status of the Gateway/HTTPRoute resource so that the information set by our
// (4) If another controllers changes the status of the Gateway/HTTPRoute resource so that the information set by our
// Gateway is removed, our Gateway will not restore the status until the EventLoop invokes the StatusUpdater as a
// result of processing some other new change to a resource(s).
// FIXME(pleshakov): Make updater production ready
Expand Down Expand Up @@ -179,6 +179,11 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP

if upd.cfg.UpdateGatewayClassStatus {
for nsname, gcs := range statuses.GatewayClassStatuses {
select {
case <-ctx.Done():
return
default:
}
upd.writeStatuses(ctx, nsname, &v1beta1.GatewayClass{}, func(object client.Object) {
gc := object.(*v1beta1.GatewayClass)
gc.Status = prepareGatewayClassStatus(gcs, upd.cfg.Clock.Now())
Expand All @@ -188,6 +193,11 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP
}

for nsname, gs := range statuses.GatewayStatuses {
select {
case <-ctx.Done():
return
default:
}
upd.writeStatuses(ctx, nsname, &v1beta1.Gateway{}, func(object client.Object) {
gw := object.(*v1beta1.Gateway)
gw.Status = prepareGatewayStatus(gs, upd.cfg.PodIP, upd.cfg.Clock.Now())
Expand All @@ -200,7 +210,6 @@ func (upd *UpdaterImpl) updateGatewayAPI(ctx context.Context, statuses GatewayAP
return
default:
}

upd.writeStatuses(ctx, nsname, &v1beta1.HTTPRoute{}, func(object client.Object) {
hr := object.(*v1beta1.HTTPRoute)
// statuses.GatewayStatus is never nil when len(statuses.HTTPRouteStatuses) > 0
Expand All @@ -219,26 +228,19 @@ func (upd *UpdaterImpl) writeStatuses(
obj client.Object,
statusSetter func(client.Object),
) {
// The function handles errors by reporting them in the logs.
// We need to get the latest version of the resource.
// Otherwise, the Update status API call can fail.
// Note: the default client uses a cache for reads, so we're not making an unnecessary API call here.
// the default is configurable in the Manager options.
if err := upd.cfg.Client.Get(ctx, nsname, obj); err != nil {
if !apierrors.IsNotFound(err) {
upd.cfg.Logger.Error(
err,
"Failed to get the recent version the resource when updating status",
"namespace", nsname.Namespace,
"name", nsname.Name,
"kind", obj.GetObjectKind().GroupVersionKind().Kind)
}
return
}

statusSetter(obj)

if err := upd.cfg.Client.Status().Update(ctx, obj); err != nil {
err := wait.ExponentialBackoffWithContext(
ctx,
wait.Backoff{
Duration: time.Millisecond * 200,
Factor: 2,
Jitter: 0.5,
Steps: 4,
Cap: time.Millisecond * 3000,
},
// Function returns true if the condition is satisfied, or an error if the loop should be aborted.
NewRetryUpdateFunc(upd.cfg.Client, upd.cfg.Client.Status(), nsname, obj, upd.cfg.Logger, statusSetter),
)
if err != nil && !errors.Is(err, context.Canceled) {
upd.cfg.Logger.Error(
err,
"Failed to update status",
Expand All @@ -247,3 +249,57 @@ func (upd *UpdaterImpl) writeStatuses(
"kind", obj.GetObjectKind().GroupVersionKind().Kind)
}
}

// NewRetryUpdateFunc returns a function which will be used in wait.ExponentialBackoffWithContext.
// The function will attempt to Update a kubernetes resource and will be retried in
// wait.ExponentialBackoffWithContext if an error occurs. Exported for testing purposes.
//
// wait.ExponentialBackoffWithContext will retry if this function returns nil as its error,
// which is what we want if we encounter an error from the functions we call. However,
// the linter will complain if we return nil if an error was found.
//
//nolint:nilerr
func NewRetryUpdateFunc(
getter controller.Getter,
updater K8sUpdater,
nsname types.NamespacedName,
obj client.Object,
logger logr.Logger,
statusSetter func(client.Object),
) func(ctx context.Context) (bool, error) {
return func(ctx context.Context) (bool, error) {
// The function handles errors by reporting them in the logs.
// We need to get the latest version of the resource.
// Otherwise, the Update status API call can fail.
// Note: the default client uses a cache for reads, so we're not making an unnecessary API call here.
// the default is configurable in the Manager options.
if err := getter.Get(ctx, nsname, obj); err != nil {
// apierrors.IsNotFound(err) can happen when the resource is deleted,
// so no need to retry or return an error.
if apierrors.IsNotFound(err) {
return true, nil
}
logger.V(1).Info(
"Encountered error when getting resource to update status",
"error", err,
"namespace", nsname.Namespace,
"name", nsname.Name,
"kind", obj.GetObjectKind().GroupVersionKind().Kind)
return false, nil
}

statusSetter(obj)

if err := updater.Update(ctx, obj); err != nil {
logger.V(1).Info(
"Encountered error updating status",
"error", err,
"namespace", nsname.Namespace,
"name", nsname.Name,
"kind", obj.GetObjectKind().GroupVersionKind().Kind)
return false, nil
}

return true, nil
}
}
75 changes: 75 additions & 0 deletions internal/framework/status/updater_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package status_test

import (
"context"
"errors"
"testing"

. "github.com/onsi/gomega"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/gateway-api/apis/v1beta1"

"github.com/nginxinc/nginx-gateway-fabric/internal/framework/controller/controllerfakes"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/status"
"github.com/nginxinc/nginx-gateway-fabric/internal/framework/status/statusfakes"
)

func TestNewRetryUpdateFunc(t *testing.T) {
tests := []struct {
getReturns error
updateReturns error
name string
expConditionPassed bool
}{
{
getReturns: errors.New("failed to get resource"),
updateReturns: nil,
name: "get fails",
expConditionPassed: false,
},
{
getReturns: apierrors.NewNotFound(schema.GroupResource{}, "not found"),
updateReturns: nil,
name: "get fails and apierrors is not found",
expConditionPassed: true,
},
{
getReturns: nil,
updateReturns: errors.New("failed to update resource"),
name: "update fails",
expConditionPassed: false,
},
{
getReturns: nil,
updateReturns: nil,
name: "nothing fails",
expConditionPassed: true,
},
}

fakeStatusUpdater := &statusfakes.FakeK8sUpdater{}
fakeGetter := &controllerfakes.FakeGetter{}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
g := NewWithT(t)
fakeStatusUpdater.UpdateReturns(test.updateReturns)
fakeGetter.GetReturns(test.getReturns)
f := status.NewRetryUpdateFunc(
fakeGetter,
fakeStatusUpdater,
types.NamespacedName{},
&v1beta1.GatewayClass{},
zap.New(),
func(client.Object) {})
conditionPassed, err := f(context.Background())

// The function should always return nil.
g.Expect(err).ToNot(HaveOccurred())
g.Expect(conditionPassed).To(Equal(test.expConditionPassed))
})
}
}
Loading
Loading