Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce a strict upgrade order between stack components #3537

Merged
merged 21 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/controller/apmserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (r *ReconcileApmServer) doReconcile(ctx context.Context, request reconcile.
return reconcile.Result{}, err
}
logger := log.WithValues("namespace", as.Namespace, "as_name", as.Name)
if !association.AllowVersion(*asVersion, as, logger) {
if !association.AllowVersion(*asVersion, as, logger, r.recorder) {
return reconcile.Result{}, nil // will eventually retry
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/controller/association/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
"go.elastic.co/apm"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -68,7 +68,7 @@ func IsConfiguredIfSet(association commonv1.Association, r record.EventRecorder)
if (&ref).IsDefined() && !association.AssociationConf().IsConfigured() {
r.Event(
association,
v1.EventTypeWarning,
corev1.EventTypeWarning,
events.EventAssociationError,
"Association backend for "+association.AssociatedType()+" is not configured",
)
Expand All @@ -94,7 +94,7 @@ func ElasticsearchAuthSettings(c k8s.Client, association commonv1.Association) (
}

secretObjKey := types.NamespacedName{Namespace: association.GetNamespace(), Name: assocConf.AuthSecretName}
var secret v1.Secret
var secret corev1.Secret
if err := c.Get(secretObjKey, &secret); err != nil {
return "", "", err
}
Expand All @@ -107,10 +107,10 @@ func ElasticsearchAuthSettings(c k8s.Client, association commonv1.Association) (
return assocConf.AuthSecretKey, string(data), nil
}

// AllowVersion returns true if the given resourceVersion is lower or equal to the associations version.
// AllowVersion returns true if the given resourceVersion is lower or equal to the associations' versions.
// For example: Kibana in version 7.8.0 cannot be deployed if its Elasticsearch association reports version 7.7.0.
// Referenced resources version is parsed from the association conf annotation.
func AllowVersion(resourceVersion version.Version, associated commonv1.Associated, logger logr.Logger) bool {
func AllowVersion(resourceVersion version.Version, associated commonv1.Associated, logger logr.Logger, recorder record.EventRecorder) bool {
for _, assoc := range associated.GetAssociations() {
assocRef := assoc.AssociationRef()
if !assocRef.IsDefined() {
Expand All @@ -125,14 +125,15 @@ func AllowVersion(resourceVersion version.Version, associated commonv1.Associate
}
refVer, err := version.Parse(assoc.AssociationConf().Version)
if err != nil {
logger.Error(err, "Invalid version found in association conf", "association_version", refVer)
logger.Error(err, "Invalid version found in association conf", "association_version", assoc.AssociationConf().Version)
sebgl marked this conversation as resolved.
Show resolved Hide resolved
return false
}
if !refVer.IsSameOrAfter(resourceVersion) {
sebgl marked this conversation as resolved.
Show resolved Hide resolved
// the version of the referenced resource (example: Elasticsearch) is lower than
// the desired version of the reconciled resource (example: Kibana)
logger.Info("Delaying version deployment since an associated resource is not upgraded yet",
"version", resourceVersion, "ref_namespace", assocRef.Namespace, "ref_name", assocRef.Name, "ref_version", refVer)
msg := "Delaying version deployment since an associated resource is not upgraded yet"
logger.Info(msg, "version", resourceVersion, "ref_namespace", assocRef.Namespace, "ref_name", assocRef.Name, "ref_version", refVer)
recorder.Event(associated, corev1.EventTypeWarning, events.EventReasonDelayed, msg)
sebgl marked this conversation as resolved.
Show resolved Hide resolved
return false
}
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/controller/association/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,10 @@ func TestAllowVersion(t *testing.T) {
associated commonv1.Associated
}
tests := []struct {
name string
args args
want bool
name string
args args
want bool
wantEvent bool
}{
{
name: "no association specified: allow",
Expand All @@ -458,12 +459,13 @@ func TestAllowVersion(t *testing.T) {
want: true,
},
{
name: "one referenced resource runs a lower version: don't allow",
name: "one referenced resource runs a lower version: don't allow and emit an event",
args: args{
resourceVersion: version.MustParse("7.7.0"),
associated: apmTwoAssocWithVersions([]string{"7.7.0", "7.6.0"}),
},
want: false,
want: false,
wantEvent: true,
},
{
name: "no version set in the association conf: don't allow",
Expand Down Expand Up @@ -492,10 +494,22 @@ func TestAllowVersion(t *testing.T) {
}
for _, tt := range tests {
logger := log.WithValues("a", "b")
recorder := record.NewFakeRecorder(10)
t.Run(tt.name, func(t *testing.T) {
if got := AllowVersion(tt.args.resourceVersion, tt.args.associated, logger); got != tt.want {
if got := AllowVersion(tt.args.resourceVersion, tt.args.associated, logger, recorder); got != tt.want {
t.Errorf("AllowVersion() = %v, want %v", got, tt.want)
}
})
if tt.wantEvent {
require.NotEmpty(t, <-recorder.Events)
} else {
// no event expected
select {
case e := <-recorder.Events:
require.Fail(t, "no event expected but got one", "event", e)
default:
// ok
}
}
}
}
2 changes: 1 addition & 1 deletion pkg/controller/association/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (r *Reconciler) getElasticsearch(
if apierrors.IsNotFound(err) {
// ES is not found, remove any existing backend configuration and retry in a bit.
if err := RemoveAssociationConf(r.Client, association.Associated(), association.AssociationConfAnnotationName()); err != nil && !apierrors.IsConflict(err) {
r.log(association).Error(err, "Failed to remove Elasticsearch output from EnterpriseSearch object")
r.log(association).Error(err, "Failed to remove Elasticsearch association conf")
sebgl marked this conversation as resolved.
Show resolved Hide resolved
return esv1.Elasticsearch{}, commonv1.AssociationPending, err
}
return esv1.Elasticsearch{}, commonv1.AssociationPending, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/beat/common/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func Reconcile(
if err != nil {
return results.WithError(err)
}
if !association.AllowVersion(*beatVersion, &params.Beat, params.Logger) {
if !association.AllowVersion(*beatVersion, &params.Beat, params.Logger, params.Recorder()) {
return results // will eventually retry
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (r *ReconcileEnterpriseSearch) doReconcile(ctx context.Context, ent entv1be
return reconcile.Result{}, err
}
logger := log.WithValues("namespace", ent.Namespace, "ent_name", ent.Name)
if !association.AllowVersion(*entVersion, ent.Associated(), logger) {
if !association.AllowVersion(*entVersion, ent.Associated(), logger, r.recorder) {
return reconcile.Result{}, nil // will eventually retry once updated
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/kibana/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (d *driver) Reconcile(
}

logger := log.WithValues("namespace", kb.Namespace, "kb_name", kb.Name)
if !association.AllowVersion(d.version, kb, logger) {
if !association.AllowVersion(d.version, kb, logger, d.Recorder()) {
return results // will eventually retry
}

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/beat/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestBeatKibanaRefWithTLSDisabled(t *testing.T) {
WithElasticsearchRef(esBuilder.Ref()).
WithKibanaRef(kbBuilder.Ref())

fbBuilder = applyYamls(t, fbBuilder, e2eFilebeatConfig, e2eFilebeatPodTemplate)
fbBuilder = beat.ApplyYamls(t, fbBuilder, E2EFilebeatConfig, E2EFilebeatPodTemplate)

dashboardCheck := getDashboardCheck(
esBuilder,
Expand Down