Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
sebgl committed Jul 12, 2019
1 parent 719a9aa commit 5c95c85
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 211 deletions.
136 changes: 3 additions & 133 deletions operators/pkg/controller/elasticsearch/driver/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package driver

import (
"context"
"crypto/x509"
"fmt"

Expand All @@ -17,7 +16,6 @@ import (

"github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/migration"
esvolume "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/volume"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/stringsutil"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
"github.com/elastic/cloud-on-k8s/operators/pkg/controller/common"
Expand Down Expand Up @@ -304,31 +302,6 @@ func (d *defaultDriver) Reconcile(
// results.WithResult(defaultRequeue)
// }
//}
//
//// List the orphaned PVCs before the Pods are created.
//// If there are some orphaned PVCs they will be adopted and remove sequentially from the list when Pods are created.
//orphanedPVCs, err := pvc.FindOrphanedVolumeClaims(d.Client, es)
//if err != nil {
// return results.WithError(err)
//}
//
//for _, change := range performableChanges.ToCreate {
// d.PodsExpectations.ExpectCreation(namespacedName)
// if err := createElasticsearchPod(
// d.Client,
// d.Scheme,
// es,
// reconcileState,
// change.Pod,
// change.PodSpecCtx,
// orphanedPVCs,
// ); err != nil {
// // pod was not created, cancel our expectation by marking it observed
// d.PodsExpectations.CreationObserved(namespacedName)
// return results.WithError(err)
// }
//}
// passed this point, any pods resource listing should check expectations first

if !esReachable {
// We cannot manipulate ES allocation exclude settings if the ES cluster
Expand All @@ -353,107 +326,12 @@ func (d *defaultDriver) Reconcile(
// return results.WithResult(defaultRequeue).WithError(err)
// }
//}
//
//if !changes.HasChanges() {
// // Current state matches expected state
// reconcileState.UpdateElasticsearchOperational(*resourcesState, observedState)
// return results
//}
//
//// Start migrating data away from all pods to be deleted
//leavingNodeNames := pod.PodListToNames(performableChanges.ToDelete.Pods())
//if err = migration.MigrateData(esClient, leavingNodeNames); err != nil {
// return results.WithError(errors.Wrap(err, "error during migrate data"))
//}
//
//// Shrink clusters by deleting deprecated pods
//if err = d.attemptPodsDeletion(
// performableChanges,
// reconcileState,
// resourcesState,
// observedState,
// results,
// esClient,
// es,
//); err != nil {
// return results.WithError(err)
//}
//// past this point, any pods resource listing should check expectations first
//
//if changes.HasChanges() && !performableChanges.HasChanges() {
// // if there are changes we'd like to perform, but none that were performable, we try again later
// results.WithResult(defaultRequeue)
//}

reconcileState.UpdateElasticsearchState(*resourcesState, observedState)

return results
}

//
//// attemptPodsDeletion deletes a list of pods after checking there is no migrating data for each of them
//func (d *defaultDriver) attemptPodsDeletion(
// changes *mutation.PerformableChanges,
// reconcileState *reconcile.State,
// resourcesState *reconcile.ResourcesState,
// observedState observer.State,
// results *reconciler.Results,
// esClient esclient.Client,
// elasticsearch v1alpha1.Elasticsearch,
//) error {
// newState := make([]corev1.Pod, len(resourcesState.CurrentPods))
// copy(newState, resourcesState.CurrentPods.Pods())
// for _, pod := range changes.ToDelete.Pods() {
// newState = removePodFromList(newState, pod)
// preDelete := func() error {
// if d.zen1SettingsUpdater != nil {
// requeue, err := d.zen1SettingsUpdater(
// elasticsearch,
// d.Client,
// esClient,
// newState,
// changes,
// reconcileState)
//
// if err != nil {
// return err
// }
//
// if requeue {
// results.WithResult(defaultRequeue)
// }
// }
// return nil
// }
//
// // do not delete a pod or expect a deletion if a data migration is in progress
// isMigratingData := migration.IsMigratingData(observedState, pod, changes.ToDelete.Pods())
// if isMigratingData {
// log.Info("Skipping deletion because of migrating data", "pod", pod.Name)
// reconcileState.UpdateElasticsearchMigrating(*resourcesState, observedState)
// results.WithResult(defaultRequeue)
// continue
// }
//
// namespacedName := k8s.ExtractNamespacedName(&elasticsearch)
// d.PodsExpectations.ExpectDeletion(namespacedName)
// result, err := deleteElasticsearchPod(
// d.Client,
// reconcileState,
// *resourcesState,
// pod,
// preDelete,
// )
// if err != nil {
// // pod was not deleted, cancel our expectation by marking it observed
// d.PodsExpectations.DeletionObserved(namespacedName)
// return err
// }
// results.WithResult(result)
// }
// return nil
//}

func (d *defaultDriver) reconcileNodeSpecs(
es v1alpha1.Elasticsearch,
esReachable bool,
Expand Down Expand Up @@ -564,16 +442,6 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results
}

func NodesInTheCluster(esClient esclient.Client, expectedNodes []string) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout)
defer cancel()
clusterNodes, err := esClient.GetNodes(ctx)
if err != nil {
return false, err
}
return stringsutil.StringsInSlice(expectedNodes, clusterNodes.Names()), nil
}

func (d *defaultDriver) scaleStatefulSetDown(
statefulSet *appsv1.StatefulSet,
targetReplicas int32,
Expand Down Expand Up @@ -634,7 +502,9 @@ func (d *defaultDriver) scaleStatefulSetDown(
}
}

return nil
// TODO: clear allocation excludes

return results
}

// newElasticsearchClient creates a new Elasticsearch HTTP client for this cluster using the provided user
Expand Down
129 changes: 129 additions & 0 deletions operators/pkg/controller/elasticsearch/driver/esstate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package driver

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

import (
"context"
"sync"

"github.com/elastic/cloud-on-k8s/operators/pkg/apis/elasticsearch/v1alpha1"
esclient "github.com/elastic/cloud-on-k8s/operators/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/operators/pkg/utils/stringsutil"
)

type ESState interface {
NodesInCluster(nodeNames []string) (bool, error)
ShardAllocationsEnabled() (bool, error)
GreenHealth() (bool, error)
}

type LazyESState struct {
esClient esclient.Client
*lazyNodes
*lazyShardsAllocationEnabled
*lazyGreenHealth
}

func NewLazyESState(esClient esclient.Client) ESState {
return &LazyESState{
esClient: esClient,
lazyNodes: &lazyNodes{esClient: esClient},
lazyShardsAllocationEnabled: &lazyShardsAllocationEnabled{esClient: esClient},
lazyGreenHealth: &lazyGreenHealth{esClient: esClient},
}
}

func initOnce(once *sync.Once, f func() error) error {
var err error
once.Do(func() {
err = f()
})
return err
}

// -- Nodes

type lazyNodes struct {
once sync.Once
esClient esclient.Client
nodes []string
}

func (n *lazyNodes) initialize() error {
ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout)
defer cancel()
nodes, err := n.esClient.GetNodes(ctx)
if err != nil {
return err
}
n.nodes = nodes.Names()
return nil
}

func (n *lazyNodes) nodeInCluster(nodeName string) (bool, error) {
if err := initOnce(&n.once, n.initialize); err != nil {
return false, err
}
return stringsutil.StringInSlice(nodeName, n.nodes), nil
}

func (n *lazyNodes) NodesInCluster(nodeNames []string) (bool, error) {
if err := initOnce(&n.once, n.initialize); err != nil {
return false, err
}
return stringsutil.StringsInSlice(nodeNames, n.nodes), nil
}

// -- Shards allocation enabled

type lazyShardsAllocationEnabled struct {
once sync.Once
esClient esclient.Client
enabled bool
}

func (s *lazyShardsAllocationEnabled) initialize() error {
ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout)
defer cancel()
allocationSettings, err := s.esClient.GetClusterRoutingAllocation(ctx)
if err != nil {
return err
}
s.enabled = allocationSettings.Transient.IsShardsAllocationEnabled()
return nil
}

func (s *lazyShardsAllocationEnabled) ShardAllocationsEnabled() (bool, error) {
if err := initOnce(&s.once, s.initialize); err != nil {
return false, err
}
return s.enabled, nil
}

// -- Green health

type lazyGreenHealth struct {
once sync.Once
esClient esclient.Client
greenHealth bool
}

func (h *lazyGreenHealth) initialize() error {
ctx, cancel := context.WithTimeout(context.Background(), esclient.DefaultReqTimeout)
defer cancel()
health, err := h.esClient.GetClusterHealth(ctx)
if err != nil {
return err
}
h.greenHealth = health.Status == string(v1alpha1.ElasticsearchGreenHealth)
return nil
}

func (h *lazyGreenHealth) GreenHealth() (bool, error) {
if err := initOnce(&h.once, h.initialize); err != nil {
return false, err
}
return h.greenHealth, nil
}
Loading

0 comments on commit 5c95c85

Please sign in to comment.