diff --git a/operators/pkg/controller/common/reconciler/expectations.go b/operators/pkg/controller/common/reconciler/expectations.go deleted file mode 100644 index 4ac0c5452b..0000000000 --- a/operators/pkg/controller/common/reconciler/expectations.go +++ /dev/null @@ -1,327 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package reconciler - -// Expectations are a way for controllers to mitigate effects of -// the K8s client cache lagging behind the apiserver. -// -// ## Context: client cache might be out-of-sync -// -// The default K8s client implementation does use a cache for all resources we get or list. -// Listing pods effectively returns pods that have been observed in the cache, relying on a -// watch being set up by the client behind the scenes. -// Hence, resources we get from a list operation may slightly lag behind resources in the apiserver. -// The cache is not invalidated on resource creation. The following can happen in a controller: -// -// * list pods: we get 2 -// * create a new pod -// * list pods again: we still get 2 (cache in not in sync yet) -// -// This could lead to creating the pod a second time (with a different generated name) at the -// next iteration of the reconciliation loop. -// The same goes for deletions. -// -// This is only a problem for resources whose name is non-deterministic. Creating twice the same -// resource with the same name is considered OK, since the second time would simply fail. -// -// ## Expectations as a solution to mitigate cache inconsistencies -// -// ReplicaSets implementation in K8s does rely on runtime expectations to mitigate those inconsistencies. -// See the expectations implementation: https://github.com/kubernetes/kubernetes/blob/v1.13.2/pkg/controller/controller_utils.go -// And its usage for ReplicaSets: https://github.com/kubernetes/kubernetes/blob/v1.13.2/pkg/controller/replicaset/replica_set.go#L90 -// -// The idea is the following: -// -// * When a resource is created, increase the expected creations for this resource. -// Example: "expect 1 pod creation for this ElasticsearchCluster". Note that expectations -// are associated to the ElasticsearchCluster resource here, but effectively observe pods. -// * Once the resource creation event is observed, decrease the expected creations (expectation observed). -// * Expectations are considered satisfied when the count is equal to 0: we can consider our cache in-sync. -// * Checking whether expectations are satisfied within a reconciliation loop iteration is a way to know -// whether we can move forward with an up-to-date cache to next reconciliation steps. -// * The same goes for deletions. -// -// Expectations have a time-to-live (5 minutes). Once reached, we consider an expectation to be fulfilled, even -// though its internal counters may not be 0. This is to avoid staying stuck with inconsistent expectation events. -// -// ## Why not reusing K8s expectations implementations? -// -// We could absolutely reuse the existing `controller.Expectations` implementations. -// Doing so forces us to vendor the whole `kubernetes` package tree, which in turns -// requires vendoring the apiserver package tree. That's a lot of imports. -// -// Also, the Expectations API is not very user-friendly. -// -// A common usage in our situation is to increment expectations whenever we create a pod. -// Two ways to do that with K8s Expectations API: -// -// * `expectations.ExpectCreations(controllerKey string, adds int)`: overrides any previous value. -// * `expectations.RaiseExpectations(controllerKey string, add, del int)`: only works if expectations exist, -// meaning `expectations.SetExpectations was called at least once before. -// -// This is replaced in our implementation by a simpler `expectations.ExpectCreations(controllerKey)`, -// that does increment the creation counter, and creates it if it doesn't exist yet. -// -// A few other things that differ in our implementation from the K8s one: -// -// * We don't accept negative counters as a correct value: it does not make sense to set the creations -// counter to -1 if it was already at 0 (could be a leftover creation from a previous controller that -// we don't care about, since we don't have expectations for it). -// * Once an expectations TTL is reached, we consider we probably missed an event, hence we choose to -// reset expectations to 0 explicitely, instead of keeping counters value but still consider expectations -// to be fulfilled. -// * `controller.UIDTrackingControllerExpectations` is an extended expectations implementation meant to handle -// update events that have a non-zero DeletionTimestamp (can be issued multiple times but should be counted -// only once). Since we do rely on controller-runtime deletion events instead, but don't need this here. -// * We only use atomic int64 here, no generic `cache.Store`: no need to deal with error handling in the caller. -// -// ## Usage -// -// Expected usage pseudo-code: -// ``` -// if !expectations.fulfilled(responsibleResourceID) { -// // expected creations and deletions are not fulfilled yet, -// // let's requeue -// return -// } -// for _, res := range resourcesToCreate { -// // expect a creation -// expectations.ExpectCreation(responsibleResourceID) -// if err := client.Create(res); err != nil { -// // cancel our expectation, since resource wasn't created -// expectations.CreationObserved(responsibleResourceID) -// return err -// } -// } -// // same mechanism for deletions -// ``` -// -// Note that the `responsibleResourceID` in this context does not map to resources we create -// or delete. For instance, it would be the ID of our ElasticsearchCluster, even though the -// resources that we effectively create and deletes are pods associated with this cluster. -// - -import ( - "sync" - "sync/atomic" - "time" - - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/finalizer" - "k8s.io/apimachinery/pkg/types" -) - -const ( - // ExpectationsTTLNanosec is the default expectations time-to-live, - // for cases where we expect an event (creation or deletion) that never happens. - // - // Set to 5 minutes similar to https://github.com/kubernetes/kubernetes/blob/v1.13.2/pkg/controller/controller_utils.go - ExpectationsTTLNanosec = 5 * time.Minute // time is internally represented as int64 nanoseconds - - // ExpectationsFinalizerName designates a finalizer to clean up expectations on es cluster deletion. - ExpectationsFinalizerName = "expectations.finalizers.elasticsearch.k8s.elastic.co" -) - -// NewExpectations creates expectations with the default TTL. -func NewExpectations() *Expectations { - return &Expectations{ - mutex: sync.RWMutex{}, - counters: map[types.NamespacedName]*expectationsCounters{}, - ttl: ExpectationsTTLNanosec, - } -} - -// Expectations holds our creation and deletions expectations for -// various resources, up to the configured TTL. -// Safe for concurrent use. -type Expectations struct { - mutex sync.RWMutex - counters map[types.NamespacedName]*expectationsCounters - ttl time.Duration -} - -// ExpectCreation marks a creation for the given resource as expected. -func (e *Expectations) ExpectCreation(namespacedName types.NamespacedName) { - e.getOrCreateCounters(namespacedName).AddCreations(1) -} - -// ExpectDeletion marks a deletion for the given resource as expected. -func (e *Expectations) ExpectDeletion(namespacedName types.NamespacedName) { - e.getOrCreateCounters(namespacedName).AddDeletions(1) -} - -// CreationObserved marks a creation event for the given resource as observed, -// cancelling the effect of a previous call to e.ExpectCreation. -func (e *Expectations) CreationObserved(namespacedName types.NamespacedName) { - e.getOrCreateCounters(namespacedName).AddCreations(-1) -} - -// DeletionObserved marks a deletion event for the given resource as observed, -// cancelling the effect of a previous call to e.ExpectDeletion. -func (e *Expectations) DeletionObserved(namespacedName types.NamespacedName) { - e.getOrCreateCounters(namespacedName).AddDeletions(-1) -} - -// Fulfilled returns true if all the expectations for the given resource -// are fulfilled (both creations and deletions). Meaning we can consider -// the controller is in-sync with resources in the apiserver. -func (e *Expectations) Fulfilled(namespacedName types.NamespacedName) bool { - creations, deletions := e.get(namespacedName) - if creations == 0 && deletions == 0 { - return true - } - return false -} - -// get creations and deletions expectations for the expected resource. -func (e *Expectations) get(namespacedName types.NamespacedName) (creations int64, deletions int64) { - return e.getOrCreateCounters(namespacedName).Get() -} - -// getOrCreateCounters returns the counters associated to the given resource. -// They may not exist yet: in such case we create and initialize them first. -func (e *Expectations) getOrCreateCounters(namespacedName types.NamespacedName) *expectationsCounters { - e.mutex.RLock() - counters, exists := e.counters[namespacedName] - e.mutex.RUnlock() - if !exists { - counters = e.createCounters(namespacedName) - } - return counters -} - -func (e *Expectations) createCounters(namespacedName types.NamespacedName) *expectationsCounters { - e.mutex.Lock() - defer e.mutex.Unlock() - // if this method is called, counters probably don't exist yet - // still re-check with lock acquired in case they would be created - // in-between 2 concurrent calls to e.getOrCreateCounters - counters, exists := e.counters[namespacedName] - if exists { - return counters - } - counters = newExpectationsCounters(e.ttl) - e.counters[namespacedName] = counters - return counters -} - -// expectationsCounters hold creations and deletions counters, -// and manages counters TTL through their last activity timestamp. -// Counters that would go below 0 will be reset to 0. -// Expectations that would exceed their TTL will be reset to 0. -// Safe for concurrent use. -type expectationsCounters struct { - creations *int64 // atomic int64 counter - deletions *int64 // atomic int64 counter - timestamp *int64 // unix timestamp in nanoseconds - ttl int64 // duration in nanoseconds -} - -// newExpectationsCounters returns an initiliazed expectationsCounters -// with the given ttl, and timestamp set to now. -func newExpectationsCounters(ttl time.Duration) *expectationsCounters { - creations := int64(0) - deletions := int64(0) - timestamp := timestampNow() - return &expectationsCounters{ - creations: &creations, - deletions: &deletions, - timestamp: ×tamp, - ttl: ttl.Nanoseconds(), - } -} - -// Get returns the current creations and deletions counters. -// If counters are expired, they are reset to 0 beforehand. -func (e *expectationsCounters) Get() (creations, deletions int64) { - if e.isExpired() { - e.reset() - } - return e.getPtrValue(e.creations), e.getPtrValue(e.deletions) -} - -// AddCreations increments the creations counter with the given value, -// which can be negative for substractions. -// If the value goes below 0, it will be reset to 0. -func (e *expectationsCounters) AddCreations(value int64) { - e.add(e.creations, value) -} - -// AddDeletions increments the deletions counter with the given value, -// which can be negative for substractions. -// If the value goes below 0, it will be reset to 0. -func (e *expectationsCounters) AddDeletions(value int64) { - e.add(e.deletions, value) -} - -// isExpired returns true if the last operation on the counters -// exceeds the configured TTL. -func (e *expectationsCounters) isExpired() bool { - if e.timestamp == nil { - return false - } - timestamp := atomic.LoadInt64(e.timestamp) - if timestampNow()-timestamp > e.ttl { - return true - } - return false -} - -// resetTimestamp sets the timestamp value to the current time. -func (e *expectationsCounters) resetTimestamp() { - atomic.StoreInt64(e.timestamp, timestampNow()) -} - -// reset sets counters values to 0, and the -// timestamp value to the current time. -func (e *expectationsCounters) reset() { - atomic.StoreInt64(e.creations, 0) - atomic.StoreInt64(e.deletions, 0) - e.resetTimestamp() -} - -// getPtrValue returns the int64 value stored at the given pointer. -// Meant to be used for internal values, eg. `getPtrValue(e.creations)`. -func (e *expectationsCounters) getPtrValue(ptr *int64) int64 { - value := atomic.LoadInt64(ptr) - if value < 0 { - // In-between situation where we have a negative value, - // return 0 instead (see `e.add` implementation). - return 0 - } - return value -} - -// add increments the int64 stored at the given pointer with the given value, -// which can be negative for substractions. -// Meant to be used for internal values, eg. `add(e.creations, -1)`. -// If the value goes below 0, it will be reset to 0. -func (e *expectationsCounters) add(ptr *int64, value int64) { - e.resetTimestamp() - newValue := atomic.AddInt64(ptr, value) - if newValue < 0 && value < 0 { - // We are reaching a negative value after a substraction: - // cancel what we just did. - // The value is still negative in-between these 2 atomic ops. - atomic.AddInt64(ptr, -value) - } -} - -// timestampNow returns the current unix timestamp in nanoseconds -func timestampNow() int64 { - return time.Now().UnixNano() -} - -// ExpectationsFinalizer removes the given cluster entry from the expectations map. -func ExpectationsFinalizer(cluster types.NamespacedName, expectations *Expectations) finalizer.Finalizer { - return finalizer.Finalizer{ - Name: ExpectationsFinalizerName, - Execute: func() error { - expectations.mutex.Lock() - defer expectations.mutex.Unlock() - delete(expectations.counters, cluster) - return nil - }, - } -} diff --git a/operators/pkg/controller/common/reconciler/expectations_test.go b/operators/pkg/controller/common/reconciler/expectations_test.go deleted file mode 100644 index 322f089197..0000000000 --- a/operators/pkg/controller/common/reconciler/expectations_test.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package reconciler - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/types" -) - -var nsn1 = types.NamespacedName{ - Namespace: "namespace", - Name: "name", -} - -var nsn2 = types.NamespacedName{ - Namespace: "namespace", - Name: "name2", -} - -func checkExpectations(t *testing.T, e *Expectations, namespacedName types.NamespacedName, expectedCreations int64, expectedDeletions int64) { - // check creations and deletions counters - actualCreations, actualDeletions := e.get(namespacedName) - require.Equal(t, expectedCreations, actualCreations) - require.Equal(t, expectedDeletions, actualDeletions) - // check expectations fulfilled - expectedFulfilled := false - if expectedCreations == 0 && expectedDeletions == 0 { - expectedFulfilled = true - } - require.Equal(t, expectedFulfilled, e.Fulfilled(namespacedName)) -} - -func TestExpectationsTTL(t *testing.T) { - // validate default behaviour with default TTL - exp := NewExpectations() - exp.ExpectCreation(nsn1) - checkExpectations(t, exp, nsn1, 1, 0) - // same test, but with a custom short TTL - exp = NewExpectations() - exp.ttl = 1 * time.Nanosecond - exp.ExpectCreation(nsn1) - // counters should be reset and expectations fulfilled - // once TTL is reached - time.Sleep(2 * time.Nanosecond) - checkExpectations(t, exp, nsn1, 0, 0) -} - -func TestExpectations(t *testing.T) { - // tests are performing operations and checks on the same expectations object, - // with state preserved between tests - e := NewExpectations() - tests := []struct { - name string - events func(e *Expectations) - expected map[types.NamespacedName][2]int64 // namespacedName -> [creations, deletions] - }{ - { - name: "empty", - events: func(e *Expectations) {}, - expected: map[types.NamespacedName][2]int64{ - nsn1: [2]int64{0, 0}, - nsn2: [2]int64{0, 0}, - }, - }, - { - name: "add an expected creation for nsn1", - events: func(e *Expectations) { - e.ExpectCreation(nsn1) - }, - expected: map[types.NamespacedName][2]int64{ - nsn1: [2]int64{1, 0}, - nsn2: [2]int64{0, 0}, - }, - }, - { - name: "add 2 more expected creations for nsn1", - events: func(e *Expectations) { - e.ExpectCreation(nsn1) - e.ExpectCreation(nsn1) - }, - expected: map[types.NamespacedName][2]int64{ - nsn1: [2]int64{3, 0}, - nsn2: [2]int64{0, 0}, - }, - }, - { - name: "add an expected creation for nsn2", - events: func(e *Expectations) { - e.ExpectCreation(nsn2) - }, - expected: map[types.NamespacedName][2]int64{ - nsn1: [2]int64{3, 0}, - nsn2: [2]int64{1, 0}, - }, - }, - { - name: "observe creation for nsn1", - events: func(e *Expectations) { - e.CreationObserved(nsn1) - }, - expected: map[types.NamespacedName][2]int64{ - nsn1: [2]int64{2, 0}, - nsn2: [2]int64{1, 0}, - }, - }, - { - name: "observe 2 creations for nsn1", - events: func(e *Expectations) { - e.CreationObserved(nsn1) - e.CreationObserved(nsn1) - }, - expected: map[types.NamespacedName][2]int64{ - nsn1: [2]int64{0, 0}, - nsn2: [2]int64{1, 0}, - }, - }, - { - name: "observe creation for nsn2", - events: func(e *Expectations) { - e.CreationObserved(nsn2) - }, - expected: map[types.NamespacedName][2]int64{ - nsn1: [2]int64{0, 0}, - nsn2: [2]int64{0, 0}, - }, - }, - { - name: "observe creation when counter is already at 0 should be a no-op", - events: func(e *Expectations) { - e.CreationObserved(nsn1) - }, - expected: map[types.NamespacedName][2]int64{ - nsn1: [2]int64{0, 0}, - nsn2: [2]int64{0, 0}, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.events(e) - for nsn, expectationsSlice := range tt.expected { - checkExpectations(t, e, nsn, expectationsSlice[0], expectationsSlice[1]) - } - }) - } -} - -func TestExpectationsFinalizer(t *testing.T) { - expectations := NewExpectations() - expectations.ExpectCreation(nsn1) - require.Contains(t, expectations.counters, nsn1) - // applying finalizer should remove the entry from the map - err := ExpectationsFinalizer(nsn1, expectations).Execute() - require.NoError(t, err) - require.NotContains(t, expectations.counters, nsn1) - // applying finalizer on non-existing entry should be fine - err = ExpectationsFinalizer(nsn1, expectations).Execute() - require.NoError(t, err) -} diff --git a/operators/pkg/controller/common/watches/expectations_watch.go b/operators/pkg/controller/common/watches/expectations_watch.go deleted file mode 100644 index 9f91cecae5..0000000000 --- a/operators/pkg/controller/common/watches/expectations_watch.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package watches - -import ( - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/reconciler" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" -) - -// ExpectationsResourceRetriever is a function that allows retrieving, from a given resource, -// the associated resource that holds expectations resources. -// For instance, from a given pod, we might want to retrieve the ElasticsearchCluster associated -// to it (see `label.ClusterFromResourceLabels`). -type ExpectationsResourceRetriever func(metaObject metav1.Object) (types.NamespacedName, bool) - -// ExpectationsWatch is an event handler for watches that markes resources creations and deletions -// as observed for the given reconciler expectations. -type ExpectationsWatch struct { - handlerKey string - expectations *reconciler.Expectations - resourceRetriever ExpectationsResourceRetriever -} - -// Make sure our ExpectationsWatch implements HandlerRegistration. -var _ HandlerRegistration = &ExpectationsWatch{} - -// NewExpectationsWatch creates an ExpectationsWatch from the given arguments. -func NewExpectationsWatch(handlerKey string, expectations *reconciler.Expectations, resourceRetriever ExpectationsResourceRetriever) *ExpectationsWatch { - return &ExpectationsWatch{ - handlerKey: handlerKey, - expectations: expectations, - resourceRetriever: resourceRetriever, - } -} - -// Key returns the key associated to this handler. -func (p *ExpectationsWatch) Key() string { - return p.handlerKey -} - -// EventHandler returns the ExpectationsWatch as an handler.EventHandler. -func (p *ExpectationsWatch) EventHandler() handler.EventHandler { - return p -} - -// Create marks a resource creation as observed in the expectations. -func (p *ExpectationsWatch) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { - resource, exists := p.resourceRetriever(evt.Meta) - if exists { - p.expectations.CreationObserved(resource) - log.V(1).Info("Marking creation observed in expectations", "name", resource.Name, "namespace", resource.Namespace) - } -} - -// Delete marks a resource deletion as observed in the expectations. -func (p *ExpectationsWatch) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { - resource, exists := p.resourceRetriever(evt.Meta) - if exists { - p.expectations.DeletionObserved(resource) - log.V(1).Info("Marking deletion observed in expectations", "name", resource.Name, "namespace", resource.Namespace) - } -} - -// Update is a no-op operation in this context. -func (p *ExpectationsWatch) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {} - -// Generic is a no-op operation in this context. -func (p *ExpectationsWatch) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) {} diff --git a/operators/pkg/controller/common/watches/expectations_watch_test.go b/operators/pkg/controller/common/watches/expectations_watch_test.go deleted file mode 100644 index e921a076e7..0000000000 --- a/operators/pkg/controller/common/watches/expectations_watch_test.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package watches - -import ( - "testing" - - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/reconciler" - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label" - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/event" -) - -const testHandlerKey = "pod-expectations" - -var testCluster = types.NamespacedName{ - Namespace: "namespace", - Name: "cluster", -} - -func TestExpectationsWatch_Key(t *testing.T) { - w := NewExpectationsWatch(testHandlerKey, nil, label.ClusterFromResourceLabels) - require.Equal(t, testHandlerKey, w.Key()) -} - -func createPodMetaObject(t *testing.T, name string) metav1.Object { - pod1 := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: testCluster.Namespace, - Labels: map[string]string{ - label.ClusterNameLabelName: testCluster.Name, - }, - }, - } - asMetaObj, err := meta.Accessor(pod1) - require.NoError(t, err) - return asMetaObj -} - -func TestExpectationsWatch_Create(t *testing.T) { - expectations := reconciler.NewExpectations() - w := NewExpectationsWatch(testHandlerKey, expectations, label.ClusterFromResourceLabels) - - tests := []struct { - name string - events func() - expectedFulfilled bool - }{ - { - name: "initially fulfilled", - events: func() {}, - expectedFulfilled: true, - }, - { - name: "expect 2 creations", - events: func() { - expectations.ExpectCreation(testCluster) - expectations.ExpectCreation(testCluster) - }, - expectedFulfilled: false, - }, - { - name: "observe 1 creation", - events: func() { - w.Create(event.CreateEvent{ - Meta: createPodMetaObject(t, "pod1"), - }, nil) - }, - expectedFulfilled: false, - }, - { - name: "observe the 2nd creation", - events: func() { - w.Create(event.CreateEvent{ - Meta: createPodMetaObject(t, "pod2"), - }, nil) - }, - expectedFulfilled: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.events() - require.Equal(t, tt.expectedFulfilled, expectations.Fulfilled(testCluster)) - }) - } -} - -func TestExpectationsWatch_Delete(t *testing.T) { - expectations := reconciler.NewExpectations() - w := NewExpectationsWatch(testHandlerKey, expectations, label.ClusterFromResourceLabels) - - tests := []struct { - name string - events func() - expectedFulfilled bool - }{ - { - name: "initially fulfilled", - events: func() {}, - expectedFulfilled: true, - }, - { - name: "expect 2 deletions", - events: func() { - expectations.ExpectDeletion(testCluster) - expectations.ExpectDeletion(testCluster) - }, - expectedFulfilled: false, - }, - { - name: "observe 1 deletion", - events: func() { - w.Delete(event.DeleteEvent{ - Meta: createPodMetaObject(t, "pod1"), - }, nil) - }, - expectedFulfilled: false, - }, - { - name: "observe the 2nd deletions", - events: func() { - w.Delete(event.DeleteEvent{ - Meta: createPodMetaObject(t, "pod2"), - }, nil) - }, - expectedFulfilled: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.events() - require.Equal(t, tt.expectedFulfilled, expectations.Fulfilled(testCluster)) - }) - } -} diff --git a/operators/pkg/controller/elasticsearch/client/client.go b/operators/pkg/controller/elasticsearch/client/client.go index e7e2a79218..3f629783dd 100644 --- a/operators/pkg/controller/elasticsearch/client/client.go +++ b/operators/pkg/controller/elasticsearch/client/client.go @@ -69,13 +69,15 @@ type Client interface { GetClusterInfo(ctx context.Context) (Info, error) // GetClusterState returns the current cluster state GetClusterState(ctx context.Context) (ClusterState, error) + // GetClusterRoutingAllocation retrieves the cluster routing allocation settings. + GetClusterRoutingAllocation(ctx context.Context) (ClusterRoutingAllocation, error) // UpdateSettings updates the settings of a cluster. UpdateSettings(ctx context.Context, settings Settings) error // ExcludeFromShardAllocation takes a comma-separated string of node names and // configures transient allocation excludes for the given nodes. ExcludeFromShardAllocation(ctx context.Context, nodes string) error - // DisableShardAllocation disables shards allocation on the cluster. - DisableShardAllocation(ctx context.Context) error + // DisableReplicaShardsAllocation disables shards allocation on the cluster (only primaries are allocated). + DisableReplicaShardsAllocation(ctx context.Context) error // EnableShardAllocation enables shards allocation on the cluster. EnableShardAllocation(ctx context.Context) error // SyncedFlush requests a synced flush on the cluster. diff --git a/operators/pkg/controller/elasticsearch/client/model.go b/operators/pkg/controller/elasticsearch/client/model.go index d967abdc32..1fadb6eb3d 100644 --- a/operators/pkg/controller/elasticsearch/client/model.go +++ b/operators/pkg/controller/elasticsearch/client/model.go @@ -52,6 +52,14 @@ type Nodes struct { Nodes map[string]Node `json:"nodes"` } +func (n Nodes) Names() []string { + names := make([]string, 0, len(n.Nodes)) + for _, node := range n.Nodes { + names = append(names, node.Name) + } + return names +} + // Node partially models an Elasticsearch node retrieved from /_nodes type Node struct { Name string `json:"name"` @@ -187,13 +195,34 @@ func (s Shard) Key() string { // AllocationSettings model a subset of the supported attributes for dynamic Elasticsearch cluster settings. type AllocationSettings struct { - ExcludeName string `json:"cluster.routing.allocation.exclude._name"` - Enable string `json:"cluster.routing.allocation.enable"` + Cluster ClusterRoutingSettings `json:"cluster,omitempty"` } // TODO awareness settings +type ClusterRoutingSettings struct { + Routing RoutingSettings `json:"routing,omitempty"` +} + +type RoutingSettings struct { + Allocation RoutingAllocationSettings `json:"allocation,omitempty"` +} + +type RoutingAllocationSettings struct { + Exclude AllocationExclude `json:"exclude,omitempty"` + Enable string `json:"enable,omitempty"` +} + +type AllocationExclude struct { + Name string `json:"_name,omitempty"` +} + +func (s AllocationSettings) IsShardsAllocationEnabled() bool { + enable := s.Cluster.Routing.Allocation.Enable + return enable == "" || enable == "all" +} + // ClusterRoutingAllocation models a subset of transient allocation settings for an Elasticsearch cluster. type ClusterRoutingAllocation struct { - Transient AllocationSettings `json:"transient"` + Transient AllocationSettings `json:"transient,omitempty"` } // DiscoveryZen set minimum number of master eligible nodes that must be visible to form a cluster. diff --git a/operators/pkg/controller/elasticsearch/client/model_test.go b/operators/pkg/controller/elasticsearch/client/model_test.go index 27173e9d1b..f183323a6d 100644 --- a/operators/pkg/controller/elasticsearch/client/model_test.go +++ b/operators/pkg/controller/elasticsearch/client/model_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestModel_RemoteCluster(t *testing.T) { @@ -56,3 +57,13 @@ func TestModel_RemoteCluster(t *testing.T) { }) } } + +func TestClusterRoutingAllocation(t *testing.T) { + clusterSettingsSample := `{"persistent":{},"transient":{"cluster":{"routing":{"allocation":{"enable":"none","exclude":{"_name":"excluded"}}}}}}` + expected := ClusterRoutingAllocation{Transient: AllocationSettings{Cluster: ClusterRoutingSettings{Routing: RoutingSettings{Allocation: RoutingAllocationSettings{Enable: "none", Exclude: AllocationExclude{Name: "excluded"}}}}}} + + var settings ClusterRoutingAllocation + require.NoError(t, json.Unmarshal([]byte(clusterSettingsSample), &settings)) + require.Equal(t, expected, settings) + require.Equal(t, false, settings.Transient.IsShardsAllocationEnabled()) +} diff --git a/operators/pkg/controller/elasticsearch/client/v6.go b/operators/pkg/controller/elasticsearch/client/v6.go index 4ba5baec06..6b740dd35e 100644 --- a/operators/pkg/controller/elasticsearch/client/v6.go +++ b/operators/pkg/controller/elasticsearch/client/v6.go @@ -19,6 +19,11 @@ func (c *clientV6) GetClusterInfo(ctx context.Context) (Info, error) { return info, c.get(ctx, "/", &info) } +func (c *clientV6) GetClusterRoutingAllocation(ctx context.Context) (ClusterRoutingAllocation, error) { + var settings ClusterRoutingAllocation + return settings, c.get(ctx, "/_cluster/settings", &settings) +} + func (c *clientV6) GetClusterState(ctx context.Context) (ClusterState, error) { var clusterState ClusterState return clusterState, c.get(ctx, "/_cluster/state/dispatcher,master_node,nodes,routing_table", &clusterState) @@ -29,18 +34,50 @@ func (c *clientV6) UpdateSettings(ctx context.Context, settings Settings) error } func (c *clientV6) ExcludeFromShardAllocation(ctx context.Context, nodes string) error { - allocationSetting := ClusterRoutingAllocation{AllocationSettings{ExcludeName: nodes, Enable: "all"}} - return c.put(ctx, "/_cluster/settings", allocationSetting, nil) + allocationSettings := ClusterRoutingAllocation{ + Transient: AllocationSettings{ + Cluster: ClusterRoutingSettings{ + Routing: RoutingSettings{ + Allocation: RoutingAllocationSettings{ + Exclude: AllocationExclude{ + Name: nodes, + }, + }, + }, + }, + }, + } + return c.put(ctx, "/_cluster/settings", allocationSettings, nil) } func (c *clientV6) EnableShardAllocation(ctx context.Context) error { - allocationSetting := ClusterRoutingAllocation{AllocationSettings{Enable: "all"}} - return c.put(ctx, "/_cluster/settings", allocationSetting, nil) -} - -func (c *clientV6) DisableShardAllocation(ctx context.Context) error { - allocationSetting := ClusterRoutingAllocation{AllocationSettings{Enable: "none"}} - return c.put(ctx, "/_cluster/settings", allocationSetting, nil) + allocationSettings := ClusterRoutingAllocation{ + Transient: AllocationSettings{ + Cluster: ClusterRoutingSettings{ + Routing: RoutingSettings{ + Allocation: RoutingAllocationSettings{ + Enable: "all", + }, + }, + }, + }, + } + return c.put(ctx, "/_cluster/settings", allocationSettings, nil) +} + +func (c *clientV6) DisableReplicaShardsAllocation(ctx context.Context) error { + allocationSettings := ClusterRoutingAllocation{ + Transient: AllocationSettings{ + Cluster: ClusterRoutingSettings{ + Routing: RoutingSettings{ + Allocation: RoutingAllocationSettings{ + Enable: "primaries", + }, + }, + }, + }, + } + return c.put(ctx, "/_cluster/settings", allocationSettings, nil) } func (c *clientV6) SyncedFlush(ctx context.Context) error { diff --git a/operators/pkg/controller/elasticsearch/driver/default.go b/operators/pkg/controller/elasticsearch/driver/default.go index 0557e25725..a363ff9c61 100644 --- a/operators/pkg/controller/elasticsearch/driver/default.go +++ b/operators/pkg/controller/elasticsearch/driver/default.go @@ -51,6 +51,8 @@ type defaultDriver struct { // Options are the options that the driver was created with. Options + expectations *Expectations + // supportedVersions verifies whether we can support upgrading from the current pods. supportedVersions esversion.LowestHighestSupportedVersions @@ -197,8 +199,6 @@ func (d *defaultDriver) Reconcile( return results.WithError(err) } - //podsState := mutation.NewPodsState(*resourcesState, observedState) - if err := d.supportedVersions.VerifySupportsExistingPods(resourcesState.CurrentPods.Pods()); err != nil { return results.WithError(err) } @@ -217,59 +217,6 @@ func (d *defaultDriver) Reconcile( return results.WithError(err) } - // - //// There might be some ongoing creations and deletions our k8s client cache - //// hasn't seen yet. In such case, requeue until we are in-sync. - //// Otherwise, we could end up re-creating multiple times the same pod with - //// different generated names through multiple reconciliation iterations. - //if !d.PodsExpectations.Fulfilled(namespacedName) { - // log.Info("Pods creations and deletions expectations are not satisfied yet. Requeuing.") - // return results.WithResult(defaultRequeue) - //} - // - //changes, err := d.calculateChanges(internalUsers, es, *resourcesState) - //if err != nil { - // return results.WithError(err) - //} - // - //log.Info( - // "Calculated all required changes", - // "to_create:", len(changes.ToCreate), - // "to_keep:", len(changes.ToKeep), - // "to_delete:", len(changes.ToDelete), - //) - // - //// restart ES processes that need to be restarted before going on with other changes - //done, err := restart.HandleESRestarts( - // restart.RestartContext{ - // Cluster: es, - // EventsRecorder: reconcileState.Recorder, - // K8sClient: d.Client, - // Changes: *changes, - // Dialer: d.Dialer, - // EsClient: esClient, - // }, - //) - //if err != nil { - // return results.WithError(err) - //} - //if !done { - // log.V(1).Info("Pods restart is not over yet, re-queueing.") - // return results.WithResult(defaultRequeue) - //} - // - //// figure out what changes we can perform right now - //performableChanges, err := mutation.CalculatePerformableChanges(es.Spec.UpdateStrategy, *changes, podsState) - //if err != nil { - // return results.WithError(err) - //} - // - //log.Info( - // "Calculated performable changes", - // "schedule_for_creation_count", len(performableChanges.ToCreate), - // "schedule_for_deletion_count", len(performableChanges.ToDelete), - //) - results.Apply( "reconcile-cluster-license", func() (controller.Result, error) { @@ -327,7 +274,7 @@ func (d *defaultDriver) Reconcile( ) } - res = d.reconcileNodeSpecs(es, podTemplateSpecBuilder, esClient, observedState) + res = d.reconcileNodeSpecs(es, esReachable, podTemplateSpecBuilder, esClient, observedState) if results.WithResults(res).HasError() { return results } @@ -355,31 +302,6 @@ func (d *defaultDriver) Reconcile( // results.WithResult(defaultRequeue) // } //} - // - //// List the orphaned PVCs before the Pods are created. - //// If there are some orphaned PVCs they will be adopted and remove sequentially from the list when Pods are created. - //orphanedPVCs, err := pvc.FindOrphanedVolumeClaims(d.Client, es) - //if err != nil { - // return results.WithError(err) - //} - // - //for _, change := range performableChanges.ToCreate { - // d.PodsExpectations.ExpectCreation(namespacedName) - // if err := createElasticsearchPod( - // d.Client, - // d.Scheme, - // es, - // reconcileState, - // change.Pod, - // change.PodSpecCtx, - // orphanedPVCs, - // ); err != nil { - // // pod was not created, cancel our expectation by marking it observed - // d.PodsExpectations.CreationObserved(namespacedName) - // return results.WithError(err) - // } - //} - // passed this point, any pods resource listing should check expectations first if !esReachable { // We cannot manipulate ES allocation exclude settings if the ES cluster @@ -404,119 +326,15 @@ func (d *defaultDriver) Reconcile( // return results.WithResult(defaultRequeue).WithError(err) // } //} - // - //if !changes.HasChanges() { - // // Current state matches expected state - // reconcileState.UpdateElasticsearchOperational(*resourcesState, observedState) - // return results - //} - // - //// Start migrating data away from all pods to be deleted - //leavingNodeNames := pod.PodListToNames(performableChanges.ToDelete.Pods()) - //if err = migration.MigrateData(esClient, leavingNodeNames); err != nil { - // return results.WithError(errors.Wrap(err, "error during migrate data")) - //} - // - //// Shrink clusters by deleting deprecated pods - //if err = d.attemptPodsDeletion( - // performableChanges, - // reconcileState, - // resourcesState, - // observedState, - // results, - // esClient, - // es, - //); err != nil { - // return results.WithError(err) - //} - //// past this point, any pods resource listing should check expectations first - // - //if changes.HasChanges() && !performableChanges.HasChanges() { - // // if there are changes we'd like to perform, but none that were performable, we try again later - // results.WithResult(defaultRequeue) - //} reconcileState.UpdateElasticsearchState(*resourcesState, observedState) return results } -// -//// attemptPodsDeletion deletes a list of pods after checking there is no migrating data for each of them -//func (d *defaultDriver) attemptPodsDeletion( -// changes *mutation.PerformableChanges, -// reconcileState *reconcile.State, -// resourcesState *reconcile.ResourcesState, -// observedState observer.State, -// results *reconciler.Results, -// esClient esclient.Client, -// elasticsearch v1alpha1.Elasticsearch, -//) error { -// newState := make([]corev1.Pod, len(resourcesState.CurrentPods)) -// copy(newState, resourcesState.CurrentPods.Pods()) -// for _, pod := range changes.ToDelete.Pods() { -// newState = removePodFromList(newState, pod) -// preDelete := func() error { -// if d.zen1SettingsUpdater != nil { -// requeue, err := d.zen1SettingsUpdater( -// elasticsearch, -// d.Client, -// esClient, -// newState, -// changes, -// reconcileState) -// -// if err != nil { -// return err -// } -// -// if requeue { -// results.WithResult(defaultRequeue) -// } -// } -// return nil -// } -// -// // do not delete a pod or expect a deletion if a data migration is in progress -// isMigratingData := migration.IsMigratingData(observedState, pod, changes.ToDelete.Pods()) -// if isMigratingData { -// log.Info("Skipping deletion because of migrating data", "pod", pod.Name) -// reconcileState.UpdateElasticsearchMigrating(*resourcesState, observedState) -// results.WithResult(defaultRequeue) -// continue -// } -// -// namespacedName := k8s.ExtractNamespacedName(&elasticsearch) -// d.PodsExpectations.ExpectDeletion(namespacedName) -// result, err := deleteElasticsearchPod( -// d.Client, -// reconcileState, -// *resourcesState, -// pod, -// preDelete, -// ) -// if err != nil { -// // pod was not deleted, cancel our expectation by marking it observed -// d.PodsExpectations.DeletionObserved(namespacedName) -// return err -// } -// results.WithResult(result) -// } -// return nil -//} - -// removePodFromList removes a single pod from the list, matching by pod name. -func removePodFromList(pods []corev1.Pod, pod corev1.Pod) []corev1.Pod { - for i, p := range pods { - if p.Name == pod.Name { - return append(pods[:i], pods[i+1:]...) - } - } - return pods -} - func (d *defaultDriver) reconcileNodeSpecs( es v1alpha1.Elasticsearch, + esReachable bool, podSpecBuilder esversion.PodTemplateSpecBuilder, esClient esclient.Client, observedState observer.State, @@ -581,6 +399,11 @@ func (d *defaultDriver) reconcileNodeSpecs( } } + if !esReachable { + // cannot perform downscale or rolling upgrade if we cannot request Elasticsearch + return results.WithResult(defaultRequeue) + } + // Phase 2: handle sset scale down. // We want to safely remove nodes from the cluster, either because the sset requires less replicas, // or because it should be removed entirely. @@ -605,8 +428,15 @@ func (d *defaultDriver) reconcileNodeSpecs( } } + // Phase 3: handle rolling upgrades. + // Control nodes restart (upgrade) by manually decrementing rollingUpdate.Partition. + rollingUpgradesRes := d.handleRollingUpgrades(es, esClient, actualStatefulSets) + results.WithResults(rollingUpgradesRes) + if rollingUpgradesRes.HasError() { + return results + } + // TODO: - // - safe node upgrade (rollingUpdate.Partition + shards allocation) // - change budget // - zen1, zen2 return results @@ -636,7 +466,7 @@ func (d *defaultDriver) scaleStatefulSetDown( // nodes are ordered by highest ordinal first var leavingNodes []string for i := initialReplicas - 1; i > targetReplicas-1; i-- { - leavingNodes = append(leavingNodes, sset.PodName(statefulSet.Name, int(i))) + leavingNodes = append(leavingNodes, sset.PodName(statefulSet.Name, i)) } // TODO: don't remove last master/last data nodes? @@ -672,45 +502,10 @@ func (d *defaultDriver) scaleStatefulSetDown( } } - return nil -} + // TODO: clear allocation excludes -// -//// calculateChanges calculates the changes we'd need to perform to go from the current cluster configuration to the -//// desired one. -//func (d *defaultDriver) calculateChanges( -// internalUsers *user.InternalUsers, -// es v1alpha1.Elasticsearch, -// resourcesState reconcile.ResourcesState, -//) (*mutation.Changes, error) { -// expectedPodSpecCtxs, err := d.expectedPodsAndResourcesResolver( -// es, -// pod.NewPodSpecParams{ -// ProbeUser: internalUsers.ProbeUser.Auth(), -// KeystoreUser: internalUsers.KeystoreUser.Auth(), -// UnicastHostsVolume: volume.NewConfigMapVolume( -// name.UnicastHostsConfigMap(es.Name), esvolume.UnicastHostsVolumeName, esvolume.UnicastHostsVolumeMountPath, -// ), -// }, -// d.OperatorImage, -// ) -// if err != nil { -// return nil, err -// } -// -// changes, err := mutation.CalculateChanges( -// es, -// expectedPodSpecCtxs, -// resourcesState, -// func(ctx pod.PodSpecContext) corev1.Pod { -// return esversion.NewPod(es, ctx) -// }, -// ) -// if err != nil { -// return nil, err -// } -// return &changes, nil -//} + return results +} // newElasticsearchClient creates a new Elasticsearch HTTP client for this cluster using the provided user func (d *defaultDriver) newElasticsearchClient(service corev1.Service, user user.User, v version.Version, caCerts []*x509.Certificate) esclient.Client { diff --git a/operators/pkg/controller/elasticsearch/driver/driver.go b/operators/pkg/controller/elasticsearch/driver/driver.go index 7056a84616..a3e4703e9b 100644 --- a/operators/pkg/controller/elasticsearch/driver/driver.go +++ b/operators/pkg/controller/elasticsearch/driver/driver.go @@ -52,9 +52,9 @@ type Options struct { Observers *observer.Manager // DynamicWatches are handles to currently registered dynamic watches. DynamicWatches watches.DynamicWatches - // PodsExpectations control ongoing pod creations and deletions - // that might not be in-sync yet with our k8s client cache - PodsExpectations *reconciler.Expectations + // Expectations control some expectations set on resources in the cache, in order to + // avoid doing certain operations if the cache hasn't seen an up-to-date resource yet. + Expectations *Expectations } // NewDriver returns a Driver that can operate the provided version @@ -66,6 +66,7 @@ func NewDriver(opts Options) (Driver, error) { driver := &defaultDriver{ Options: opts, + expectations: NewGenerationExpectations(), observedStateResolver: opts.Observers.ObservedStateResolver, resourcesStateResolver: esreconcile.NewResourcesStateFromAPI, usersReconciler: user.ReconcileUsers, diff --git a/operators/pkg/controller/elasticsearch/driver/esstate.go b/operators/pkg/controller/elasticsearch/driver/esstate.go new file mode 100644 index 0000000000..0b45dcc513 --- /dev/null +++ b/operators/pkg/controller/elasticsearch/driver/esstate.go @@ -0,0 +1,129 @@ +package driver + +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +import ( + "context" + "sync" + + "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" + esclient "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/stringsutil" +) + +type ESState interface { + NodesInCluster(nodeNames []string) (bool, error) + ShardAllocationsEnabled() (bool, error) + GreenHealth() (bool, error) +} + +type LazyESState struct { + esClient esclient.Client + *lazyNodes + *lazyShardsAllocationEnabled + *lazyGreenHealth +} + +func NewLazyESState(esClient esclient.Client) ESState { + return &LazyESState{ + esClient: esClient, + lazyNodes: &lazyNodes{esClient: esClient}, + lazyShardsAllocationEnabled: &lazyShardsAllocationEnabled{esClient: esClient}, + lazyGreenHealth: &lazyGreenHealth{esClient: esClient}, + } +} + +func initOnce(once *sync.Once, f func() error) error { + var err error + once.Do(func() { + err = f() + }) + return err +} + +// -- Nodes + +type lazyNodes struct { + once sync.Once + esClient esclient.Client + nodes []string +} + +func (n *lazyNodes) initialize() error { + ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout) + defer cancel() + nodes, err := n.esClient.GetNodes(ctx) + if err != nil { + return err + } + n.nodes = nodes.Names() + return nil +} + +func (n *lazyNodes) nodeInCluster(nodeName string) (bool, error) { + if err := initOnce(&n.once, n.initialize); err != nil { + return false, err + } + return stringsutil.StringInSlice(nodeName, n.nodes), nil +} + +func (n *lazyNodes) NodesInCluster(nodeNames []string) (bool, error) { + if err := initOnce(&n.once, n.initialize); err != nil { + return false, err + } + return stringsutil.StringsInSlice(nodeNames, n.nodes), nil +} + +// -- Shards allocation enabled + +type lazyShardsAllocationEnabled struct { + once sync.Once + esClient esclient.Client + enabled bool +} + +func (s *lazyShardsAllocationEnabled) initialize() error { + ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout) + defer cancel() + allocationSettings, err := s.esClient.GetClusterRoutingAllocation(ctx) + if err != nil { + return err + } + s.enabled = allocationSettings.Transient.IsShardsAllocationEnabled() + return nil +} + +func (s *lazyShardsAllocationEnabled) ShardAllocationsEnabled() (bool, error) { + if err := initOnce(&s.once, s.initialize); err != nil { + return false, err + } + return s.enabled, nil +} + +// -- Green health + +type lazyGreenHealth struct { + once sync.Once + esClient esclient.Client + greenHealth bool +} + +func (h *lazyGreenHealth) initialize() error { + ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout) + defer cancel() + health, err := h.esClient.GetClusterHealth(ctx) + if err != nil { + return err + } + h.greenHealth = health.Status == string(v1alpha1.ElasticsearchGreenHealth) + return nil +} + +func (h *lazyGreenHealth) GreenHealth() (bool, error) { + if err := initOnce(&h.once, h.initialize); err != nil { + return false, err + } + return h.greenHealth, nil +} diff --git a/operators/pkg/controller/elasticsearch/driver/generation.go b/operators/pkg/controller/elasticsearch/driver/generation.go new file mode 100644 index 0000000000..5bedbb063e --- /dev/null +++ b/operators/pkg/controller/elasticsearch/driver/generation.go @@ -0,0 +1,34 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +// TODO: garbage collect/finalize deprecated UIDs +type Expectations struct { + generations map[types.UID]int64 +} + +func NewGenerationExpectations() *Expectations { + return &Expectations{ + generations: make(map[types.UID]int64), + } +} + +func (e *Expectations) ExpectGeneration(meta metav1.ObjectMeta) { + e.generations[meta.UID] = meta.Generation +} + +func (e *Expectations) GenerationExpected(metaObjs ...metav1.ObjectMeta) bool { + for _, meta := range metaObjs { + if expectedGen, exists := e.generations[meta.UID]; exists && meta.Generation < expectedGen { + return false + } + } + return true +} diff --git a/operators/pkg/controller/elasticsearch/driver/upgrade.go b/operators/pkg/controller/elasticsearch/driver/upgrade.go new file mode 100644 index 0000000000..3f49b3a025 --- /dev/null +++ b/operators/pkg/controller/elasticsearch/driver/upgrade.go @@ -0,0 +1,305 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package driver + +import ( + "context" + + "github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/common/reconciler" + esclient "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/client" + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" +) + +func (d *defaultDriver) handleRollingUpgrades( + es v1alpha1.Elasticsearch, + esClient esclient.Client, + statefulSets sset.StatefulSetList, +) *reconciler.Results { + results := &reconciler.Results{} + + // We need an up-to-date ES state, but avoid requesting information we may not need. + esState := NewLazyESState(esClient) + + // Maybe upgrade some of the nodes. + res := d.doRollingUpgrade(es, statefulSets, esClient, esState) + results.WithResults(res) + + // Maybe re-enable shards allocation if upgraded nodes are back into the cluster. + res = d.MaybeEnableShardsAllocation(es, esClient, esState, statefulSets) + results.WithResults(res) + + return results +} + +func (d *defaultDriver) doRollingUpgrade( + es v1alpha1.Elasticsearch, + statefulSets sset.StatefulSetList, + esClient esclient.Client, + esState ESState, +) *reconciler.Results { + results := &reconciler.Results{} + + if !d.expectations.GenerationExpected(statefulSets.ObjectMetas()...) { + // Our cache of SatefulSets is out of date compared to previous reconciliation operations. + // It does not matter much here since operations are idempotent, but we might as well avoid + // useless operations that would end up in a resource update conflict anyway. + log.V(1).Info("StatefulSet cache out-of-date, re-queueing") + return results.WithResult(defaultRequeue) + } + + if !statefulSets.RevisionUpdateScheduled() { + // nothing to upgrade + return results + } + + // TODO: deal with multiple restarts at once, taking the changeBudget into account. + // We'd need to stop checking cluster health and do something smarter, since cluster health green check + // should be done **in between** restarts to make sense, which is pretty hard to do since we don't + // trigger restarts but just allow the sset controller to do it at its own pace. + // Instead of green health, we could look at shards status, taking into account nodes + // we scheduled for a restart (maybe not restarted yet). + + // TODO: don't upgrade more than 1 master concurrently (ok for now since we upgrade 1 node at a time anyway) + + maxConcurrentUpgrades := 1 + scheduledUpgrades := 0 + + for i, statefulSet := range statefulSets { + // Inspect each pod, starting from the highest ordinal, and decrement the partition to allow + // pod upgrades to go through, controlled by the StatefulSet controller. + for partition := sset.GetUpdatePartition(statefulSet); partition >= 0; partition-- { + if scheduledUpgrades >= maxConcurrentUpgrades { + return results.WithResult(defaultRequeue) + } + if partition >= sset.Replicas(statefulSet) { + continue + } + + // Do we need to upgrade that pod? + podName := sset.PodName(statefulSet.Name, partition) + podRef := types.NamespacedName{Namespace: statefulSet.Namespace, Name: podName} + alreadyUpgraded, err := podUpgradeDone(d.Client, esState, podRef, statefulSet.Status.UpdateRevision) + if err != nil { + return results.WithError(err) + } + if alreadyUpgraded { + continue + } + + // An upgrade is required for that pod. + scheduledUpgrades++ + + // Is the pod upgrade already scheduled? + if partition == sset.GetUpdatePartition(statefulSet) { + continue + } + + // Is the cluster ready for the node upgrade? + clusterReady, err := clusterReadyForNodeRestart(es, esState) + if err != nil { + return results.WithError(err) + } + if !clusterReady { + // retry later + return results.WithResult(defaultRequeue) + } + + log.Info("Preparing cluster for node restart", "namespace", es.Namespace, "name", es.Name) + if err := prepareClusterForNodeRestart(esClient, esState); err != nil { + return results.WithError(err) + } + + // Upgrade the pod. + if err := d.upgradeStatefulSetPartition(es, &statefulSets[i], esClient, partition); err != nil { + return results.WithError(err) + } + scheduledUpgrades++ + } + } + return results +} + +func (d *defaultDriver) upgradeStatefulSetPartition( + es v1alpha1.Elasticsearch, + statefulSet *appsv1.StatefulSet, + esClient esclient.Client, + newPartition int32, +) error { + // TODO: zen1, zen2 + + // Node can be removed, update the StatefulSet rollingUpdate.Partition ordinal. + log.Info("Updating rollingUpdate.Partition", + "namespace", statefulSet.Namespace, + "name", statefulSet.Name, + "from", statefulSet.Spec.UpdateStrategy.RollingUpdate.Partition, + "to", &newPartition, + ) + statefulSet.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{ + Partition: &newPartition, + } + if err := d.Client.Update(statefulSet); err != nil { + return err + } + + // Register the updated sset generation to deal with out-of-date sset cache. + d.Expectations.ExpectGeneration(statefulSet.ObjectMeta) + + return nil +} + +func prepareClusterForNodeRestart(esClient esclient.Client, esState ESState) error { + // Disable shard allocations to avoid shards moving around while the node is temporarily down + shardsAllocationEnabled, err := esState.ShardAllocationsEnabled() + if err != nil { + return err + } + if shardsAllocationEnabled { + if err := disableShardsAllocation(esClient); err != nil { + return err + } + } + + // Request a sync flush to optimize indices recovery when the node restarts. + if err := doSyncFlush(esClient); err != nil { + return err + } + + // TODO: halt ML jobs on that node + return nil +} + +// clusterReadyForNodeRestart returns true if the ES cluster allows a node to be restarted +// with minimized downtime and no unexpected data loss. +func clusterReadyForNodeRestart(es v1alpha1.Elasticsearch, esState ESState) (bool, error) { + // Check the cluster health: only allow node restart if health is green. + // This would cause downtime if some shards have 0 replicas, but we consider that's on the user. + // TODO: we could technically still restart a node if the cluster is yellow, + // as long as there are other copies of the shards in-sync on other nodes + // TODO: the fact we rely on a cached health here would prevent more than 1 restart + // in a single reconciliation + green, err := esState.GreenHealth() + if err != nil { + return false, err + } + if !green { + log.Info("Skipping node rolling upgrade since cluster is not green", "namespace", es.Namespace, "name", es.Name) + return false, nil + } + return true, nil +} + +// podUpgradeDone inspects the given pod and returns true if it was successfully upgraded. +func podUpgradeDone(c k8s.Client, esState ESState, podRef types.NamespacedName, expectedRevision string) (bool, error) { + if expectedRevision == "" { + // no upgrade scheduled for the sset + return false, nil + } + // retrieve pod to inspect its revision label + var pod corev1.Pod + err := c.Get(podRef, &pod) + if err != nil && !errors.IsNotFound(err) { + return false, err + } + if errors.IsNotFound(err) || !pod.DeletionTimestamp.IsZero() { + // pod is terminating + return false, nil + } + if sset.PodRevision(pod) != expectedRevision { + // pod revision does not match the sset upgrade revision + return false, nil + } + // is the pod ready? + if !k8s.IsPodReady(pod) { + return false, nil + } + // has the node joined the cluster yet? + inCluster, err := esState.NodesInCluster([]string{podRef.Name}) + if err != nil { + return false, err + } + if !inCluster { + log.V(1).Info("Node has not joined the cluster yet", "namespace", podRef.Namespace, "name", podRef.Name) + return false, err + } + return true, nil +} + +func disableShardsAllocation(esClient esclient.Client) error { + ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout) + defer cancel() + return esClient.DisableReplicaShardsAllocation(ctx) +} + +func doSyncFlush(esClient esclient.Client) error { + ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout) + defer cancel() + return esClient.SyncedFlush(ctx) +} + +func (d *defaultDriver) MaybeEnableShardsAllocation( + es v1alpha1.Elasticsearch, + esClient esclient.Client, + esState ESState, + statefulSets sset.StatefulSetList, +) *reconciler.Results { + results := &reconciler.Results{} + // Since we rely on sset rollingUpdate.Partition, requeue in case our cache hasn't seen a sset update yet. + // Otherwise we could re-enable shards allocation while a pod was just scheduled for termination, + // with the partition in the sset cache being outdated. + if !d.Expectations.GenerationExpected(statefulSets.ObjectMetas()...) { + return results.WithResult(defaultRequeue) + } + + alreadyEnabled, err := esState.ShardAllocationsEnabled() + if err != nil { + return results.WithError(err) + } + if alreadyEnabled { + return results + } + + // Make sure all pods scheduled for upgrade have been upgraded. + scheduledUpgradesDone, err := sset.ScheduledUpgradesDone(d.Client, statefulSets) + if err != nil { + return results.WithError(err) + } + if !scheduledUpgradesDone { + log.V(1).Info( + "Rolling upgrade not over yet, some pods don't have the updated revision, keeping shard allocations disabled", + "namespace", es.Namespace, + "name", es.Name, + ) + return results.WithResult(defaultRequeue) + } + + // Make sure all nodes scheduled for upgrade are back into the cluster. + nodesInCluster, err := esState.NodesInCluster(statefulSets.PodNames()) + if err != nil { + return results.WithError(err) + } + if !nodesInCluster { + log.V(1).Info( + "Some upgraded nodes are not back in the cluster yet, keeping shard allocations disabled", + "namespace", es.Namespace, + "name", es.Name, + ) + return results.WithResult(defaultRequeue) + } + + log.Info("Enabling shards allocation", "namespace", es.Namespace, "name", es.Name) + ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout) + defer cancel() + if err := esClient.EnableShardAllocation(ctx); err != nil { + return results.WithError(err) + } + + return results +} diff --git a/operators/pkg/controller/elasticsearch/elasticsearch_controller.go b/operators/pkg/controller/elasticsearch/elasticsearch_controller.go index 002075b076..f4ebf7c7d0 100644 --- a/operators/pkg/controller/elasticsearch/elasticsearch_controller.go +++ b/operators/pkg/controller/elasticsearch/elasticsearch_controller.go @@ -67,9 +67,9 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) (*ReconcileE esObservers: observer.NewManager(params.Dialer, client, observer.DefaultSettings), - finalizers: finalizer.NewHandler(client), - dynamicWatches: watches.NewDynamicWatches(), - podsExpectations: reconciler.NewExpectations(), + finalizers: finalizer.NewHandler(client), + dynamicWatches: watches.NewDynamicWatches(), + expectations: driver.NewGenerationExpectations(), Parameters: params, }, nil @@ -166,9 +166,9 @@ type ReconcileElasticsearch struct { dynamicWatches watches.DynamicWatches - // podsExpectations help dealing with inconsistencies in our client cache, - // by marking Pods creation/deletion as expected, and waiting til they are effectively observed. - podsExpectations *reconciler.Expectations + // expectations help dealing with inconsistencies in our client cache, + // by marking resources updates as expected, and skipping some operations if the cache is not up-to-date. + expectations *driver.Expectations // iteration is the number of times this controller has run its Reconcile method iteration int64 @@ -250,10 +250,10 @@ func (r *ReconcileElasticsearch) internalReconcile( Version: *ver, - Observers: r.esObservers, - DynamicWatches: r.dynamicWatches, - PodsExpectations: r.podsExpectations, - Parameters: r.Parameters, + Expectations: r.expectations, + Observers: r.esObservers, + DynamicWatches: r.dynamicWatches, + Parameters: r.Parameters, }) if err != nil { return results.WithError(err) @@ -285,7 +285,6 @@ func (r *ReconcileElasticsearch) finalizersFor( ) []finalizer.Finalizer { clusterName := k8s.ExtractNamespacedName(&es) return []finalizer.Finalizer{ - reconciler.ExpectationsFinalizer(clusterName, r.podsExpectations), r.esObservers.Finalizer(clusterName), settings.SecureSettingsFinalizer(clusterName, watched), http.DynamicWatchesFinalizer(r.dynamicWatches, es.Name, esname.ESNamer), diff --git a/operators/pkg/controller/elasticsearch/restart/elasticsearch.go b/operators/pkg/controller/elasticsearch/restart/elasticsearch.go index 4a2e46f2d0..a4ef705d5c 100644 --- a/operators/pkg/controller/elasticsearch/restart/elasticsearch.go +++ b/operators/pkg/controller/elasticsearch/restart/elasticsearch.go @@ -18,7 +18,7 @@ func prepareClusterForStop(esClient client.Client) error { log.V(1).Info("Disabling shards allocation for coordinated restart") ctx, cancel := context.WithTimeout(context.Background(), client.DefaultReqTimeout) defer cancel() - if err := esClient.DisableShardAllocation(ctx); err != nil { + if err := esClient.DisableReplicaShardsAllocation(ctx); err != nil { return err } diff --git a/operators/pkg/controller/elasticsearch/sset/list.go b/operators/pkg/controller/elasticsearch/sset/list.go index 9c357436f7..60e5279c87 100644 --- a/operators/pkg/controller/elasticsearch/sset/list.go +++ b/operators/pkg/controller/elasticsearch/sset/list.go @@ -5,12 +5,12 @@ package sset import ( + "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label" + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/label" - "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" ) type StatefulSetList []appsv1.StatefulSet @@ -33,3 +33,41 @@ func (l StatefulSetList) GetByName(ssetName string) (appsv1.StatefulSet, bool) { } return appsv1.StatefulSet{}, false } + +func (l StatefulSetList) ObjectMetas() []metav1.ObjectMeta { + objs := make([]metav1.ObjectMeta, len(l)) + for _, sset := range l { + objs = append(objs, sset.ObjectMeta) + } + return objs +} + +// RevisionUpdateScheduled returns true if at least one revision update is scheduled. +func (l StatefulSetList) RevisionUpdateScheduled() bool { + for _, s := range l { + if s.Status.UpdateRevision != "" && s.Status.UpdateRevision != s.Status.CurrentRevision { + return true + } + } + return false +} + +// PodNames returns the names of the pods for all StatefulSets in the list. +func (l StatefulSetList) PodNames() []string { + var names []string + for _, s := range l { + names = append(names, PodNames(s)...) + } + return names +} + +// GetUpdatePartition returns the updateStrategy.Partition index, or falls back to the number of replicas if not set. +func GetUpdatePartition(statefulSet appsv1.StatefulSet) int32 { + if statefulSet.Spec.UpdateStrategy.RollingUpdate.Partition != nil { + return *statefulSet.Spec.UpdateStrategy.RollingUpdate.Partition + } + if statefulSet.Spec.Replicas != nil { + return *statefulSet.Spec.Replicas + } + return 0 +} diff --git a/operators/pkg/controller/elasticsearch/sset/pod.go b/operators/pkg/controller/elasticsearch/sset/pod.go index 9af5d8074f..bd6ec6fe64 100644 --- a/operators/pkg/controller/elasticsearch/sset/pod.go +++ b/operators/pkg/controller/elasticsearch/sset/pod.go @@ -4,8 +4,57 @@ package sset -import "fmt" +import ( + "fmt" -func PodName(ssetName string, ordinal int) string { + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + + "github.com/elastic/cloud-on-k8s/operators/pkg/utils/k8s" +) + +func PodName(ssetName string, ordinal int32) string { return fmt.Sprintf("%s-%d", ssetName, ordinal) } + +func PodNames(sset appsv1.StatefulSet) []string { + names := make([]string, 0, Replicas(sset)) + for i := int32(0); i < Replicas(sset); i++ { + names = append(names, PodName(sset.Name, i)) + } + return names +} + +func PodRevision(pod corev1.Pod) string { + return pod.Labels[appsv1.StatefulSetRevisionLabel] +} + +// ScheduledUpgradesDone returns true if all pods scheduled for upgrade have been upgraded. +// This is done by checking the revision of pods whose ordinal is higher or equal than the StatefulSet +// rollingUpdate.Partition index. +func ScheduledUpgradesDone(c k8s.Client, statefulSets StatefulSetList) (bool, error) { + for _, s := range statefulSets { + if s.Status.UpdateRevision == "" { + // no upgrade scheduled + continue + } + partition := GetUpdatePartition(s) + for i := Replicas(s) - 1; i >= partition; i-- { + var pod corev1.Pod + err := c.Get(types.NamespacedName{Namespace: s.Namespace, Name: PodName(s.Name, i)}, &pod) + if errors.IsNotFound(err) { + // pod probably being terminated + return false, nil + } + if err != nil { + return false, err + } + if PodRevision(pod) != s.Status.UpdateRevision { + return false, nil + } + } + } + return true, nil +} diff --git a/operators/pkg/utils/stringsutil/strings.go b/operators/pkg/utils/stringsutil/strings.go index fa713eff27..2d9a95ff30 100644 --- a/operators/pkg/utils/stringsutil/strings.go +++ b/operators/pkg/utils/stringsutil/strings.go @@ -29,6 +29,20 @@ func StringInSlice(str string, list []string) bool { return false } +// StringsInSlice returns true if the given strings are found in the provided slice, else returns false +func StringsInSlice(strings []string, slice []string) bool { + asMap := make(map[string]struct{}, len(slice)) + for _, s := range slice { + asMap[s] = struct{}{} + } + for _, s := range strings { + if _, exists := asMap[s]; !exists { + return false + } + } + return true +} + // RemoveStringInSlice returns a new slice with all occurrences of s removed, // keeping the given slice unmodified func RemoveStringInSlice(s string, slice []string) []string {