Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix how cluster.initial_master_nodes is set #2315

Merged
merged 5 commits into from
Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver
package bootstrap

import (
esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version/zen2"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

var log = logf.Log.WithName("elasticsearch-uuid")

const (
// ClusterUUIDAnnotationName used to store the cluster UUID as an annotation when cluster has been bootstrapped.
ClusterUUIDAnnotationName = "elasticsearch.k8s.elastic.co/cluster-uuid"
Expand All @@ -25,55 +26,18 @@ func AnnotatedForBootstrap(cluster esv1.Elasticsearch) bool {
}

func ReconcileClusterUUID(c k8s.Client, cluster *esv1.Elasticsearch, observedState observer.State) error {
reBootstrap, err := clusterNeedsReBootstrap(c, cluster)
if err != nil {
return err
}

if AnnotatedForBootstrap(*cluster) {
if reBootstrap {
log.Info("cluster re-bootstrap necessary",
"version", cluster.Spec.Version,
"namespace", cluster.Namespace,
"name", cluster.Name,
)
return removeUUIDAnnotation(c, cluster)
}
// already annotated, nothing to do.
return nil
}
if clusterIsBootstrapped(observedState) && !reBootstrap {
if clusterIsBootstrapped(observedState) {
// cluster bootstrapped but not annotated yet
return annotateWithUUID(cluster, observedState, c)
}
// cluster not bootstrapped yet
return nil
}

func removeUUIDAnnotation(client k8s.Client, es *esv1.Elasticsearch) error {
annotations := es.Annotations
if annotations == nil {
return nil
}
delete(es.Annotations, ClusterUUIDAnnotationName)
return client.Update(es)
}

// clusterNeedsReBootstrap is true if we are updating a single master cluster from 6.x to 7.x
// because we lose the 'cluster' when rolling the single master node.
// Invariant: no grow and shrink
func clusterNeedsReBootstrap(client k8s.Client, es *esv1.Elasticsearch) (bool, error) {
initialZen2Upgrade, err := zen2.IsInitialZen2Upgrade(client, *es)
if err != nil {
return false, err
}
currentMasters, err := sset.GetActualMastersForCluster(client, *es)
if err != nil {
return false, err
}
return len(currentMasters) == 1 && initialZen2Upgrade, nil
}

// clusterIsBootstrapped returns true if the cluster has formed and has a UUID.
func clusterIsBootstrapped(observedState observer.State) bool {
return observedState.ClusterInfo != nil &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package driver
package bootstrap

import (
"testing"
Expand All @@ -12,7 +12,6 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/observer"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/sset"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -28,35 +27,13 @@ func bootstrappedES() *esv1.Elasticsearch {
}
}

func bootstrappedESWithChangeBudget(maxSurge, maxUnavailable *int32) *esv1.Elasticsearch {
es := bootstrappedES()
es.Spec.UpdateStrategy = esv1.UpdateStrategy{
ChangeBudget: esv1.ChangeBudget{
MaxSurge: maxSurge,
MaxUnavailable: maxUnavailable,
},
}

return es
}

func notBootstrappedES() *esv1.Elasticsearch {
return &esv1.Elasticsearch{
ObjectMeta: metav1.ObjectMeta{Name: "cluster"},
Spec: esv1.ElasticsearchSpec{Version: "7.3.0"},
}
}

func reBootstrappingES() *esv1.Elasticsearch {
return &esv1.Elasticsearch{
ObjectMeta: metav1.ObjectMeta{
Name: "cluster",
Annotations: map[string]string{},
},
Spec: esv1.ElasticsearchSpec{Version: "7.3.0"},
}
}

func TestAnnotatedForBootstrap(t *testing.T) {
require.True(t, AnnotatedForBootstrap(*bootstrappedES()))
require.False(t, AnnotatedForBootstrap(*notBootstrappedES()))
Expand Down Expand Up @@ -147,33 +124,6 @@ func TestReconcileClusterUUID(t *testing.T) {
observedState: observer.State{ClusterInfo: &client.Info{ClusterUUID: "uuid"}},
wantCluster: bootstrappedES(),
},
{
name: "annotated, bootstrapped, but needs re-bootstrapping due to single node upgrade ",
c: k8s.WrappedFakeClient(
bootstrappedES(),
sset.TestPod{
ClusterName: "cluster",
Version: "6.8.0",
Master: true,
}.BuildPtr()),
cluster: bootstrappedES(),
observedState: observer.State{ClusterInfo: &client.Info{ClusterUUID: "uuid"}},
wantCluster: reBootstrappingES(),
},
{
name: "not annotated, bootstrapped, but still on pre-upgrade version",
c: k8s.WrappedFakeClient(
reBootstrappingES(),
sset.TestPod{
ClusterName: "cluster",
Version: "6.8.0",
Master: true,
}.BuildPtr(),
),
cluster: reBootstrappingES(),
observedState: observer.State{ClusterInfo: &client.Info{ClusterUUID: "uuid"}},
wantCluster: reBootstrappingES(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/elasticsearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type Client interface {
GetNodes(ctx context.Context) (Nodes, error)
// GetNodesStats calls the _nodes/stats api to return a map(nodeName -> NodeStats)
GetNodesStats(ctx context.Context) (NodesStats, error)
// ClusterBootstrappedForZen2 returns true if the cluster is relying on zen2 orchestration.
ClusterBootstrappedForZen2(ctx context.Context) (bool, error)
// AddVotingConfigExclusions sets the transient and persistent setting of the same name in cluster settings.
//
// If timeout is the empty string, the default is used.
Expand Down
35 changes: 35 additions & 0 deletions pkg/controller/elasticsearch/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,38 @@ func TestAPIError_Types(t *testing.T) {
})
}
}

func TestClient_ClusterBootstrappedForZen2(t *testing.T) {
tests := []struct {
expectedPath, version string
bootstrappedForZen2, wantErr bool
}{
{
expectedPath: "/_nodes/_master",
version: "6.8.0",
bootstrappedForZen2: false,
wantErr: false,
},
{
expectedPath: "/_nodes/_master",
version: "7.5.0",
bootstrappedForZen2: true,
wantErr: false,
},
}

for _, tt := range tests {
client := NewMockClient(version.MustParse(tt.version), func(req *http.Request) *http.Response {
require.Equal(t, tt.expectedPath, req.URL.Path)
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(strings.NewReader(fixtures.MasterNodeForVersion(tt.version))),
}
})
bootstrappedForZen2, err := client.ClusterBootstrappedForZen2(context.Background())
if (err != nil) != tt.wantErr {
t.Errorf("Client.DeleteVotingConfigExclusions() error = %v, wantErr %v", err, tt.wantErr)
}
require.Equal(t, tt.bootstrappedForZen2, bootstrappedForZen2)
}
}
17 changes: 14 additions & 3 deletions pkg/controller/elasticsearch/client/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package client

import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/pkg/utils/stringsutil"
"github.com/pkg/errors"
)

// Info represents the response from /
Expand Down Expand Up @@ -66,16 +68,25 @@ func (n Nodes) Names() []string {

// Node partially models an Elasticsearch node retrieved from /_nodes
type Node struct {
Name string `json:"name"`
Roles []string `json:"roles"`
JVM struct {
Name string `json:"name"`
Version string `json:"version"`
Roles []string `json:"roles"`
JVM struct {
StartTimeInMillis int64 `json:"start_time_in_millis"`
Mem struct {
HeapMaxInBytes int `json:"heap_max_in_bytes"`
} `json:"mem"`
} `json:"jvm"`
}

func (n Node) isV7OrAbove() (bool, error) {
v, err := version.Parse(n.Version)
if err != nil {
return false, errors.Wrap(err, fmt.Sprintf("unable to parse node version %s", n.Version))
}
return v.Major >= 7, nil
}

// NodesStats partially models the response from a request to /_nodes/stats
type NodesStats struct {
Nodes map[string]NodeStats `json:"nodes"`
Expand Down
15 changes: 15 additions & 0 deletions pkg/controller/elasticsearch/client/test_fixtures/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,3 +725,18 @@ const (
}
`
)

func MasterNodeForVersion(version string) string {
return `
{
"cluster_name": "elasticsearch-sample",
"nodes": {
"ICZ6uWfXQ3iQMzz-oLMlNw": {
"name": "elasticsearch-sample-es-default-0",
"version": "` + version + `",
"roles": ["ingest", "master", "data", "ml"]
}
}
}
`
}
19 changes: 19 additions & 0 deletions pkg/controller/elasticsearch/client/v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client

import (
"context"
"fmt"
"net/http"
"net/url"

Expand Down Expand Up @@ -112,6 +113,24 @@ func (c *clientV6) DeleteVotingConfigExclusions(ctx context.Context, waitForRemo
return errors.New("Not supported in Elasticsearch 6.x")
}

func (c *clientV6) ClusterBootstrappedForZen2(ctx context.Context) (bool, error) {
// Look at the current master node of the cluster: if it's running version 7.x.x or above,
// the cluster has been bootstrapped.
// Even though c is a clientV6, it may be targeting a mixed v6/v7 having a v7 master.
var response Nodes
if err := c.get(ctx, "/_nodes/_master", &response); err != nil {
return false, err
}
if len(response.Nodes) != 1 {
return false, fmt.Errorf("GET /_nodes/_master returned %d nodes", len(response.Nodes))
sebgl marked this conversation as resolved.
Show resolved Hide resolved
}
for _, master := range response.Nodes {
return master.isV7OrAbove()
}
// should never happen since we ensured a single entry in the above map
return false, errors.New("no master found in ClusterBootstrappedForZen2")
}

func (c *clientV6) Request(ctx context.Context, r *http.Request) (*http.Response, error) {
newURL, err := url.Parse(stringsutil.Concat(c.Endpoint, r.URL.String()))
if err != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/controller/elasticsearch/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
controller "sigs.k8s.io/controller-runtime/pkg/reconcile"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common"
commondriver "github.com/elastic/cloud-on-k8s/pkg/controller/common/driver"
Expand All @@ -19,6 +24,7 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/watches"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/bootstrap"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/certificates"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/cleanup"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
Expand All @@ -33,10 +39,6 @@ import (
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/user"
esversion "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/version"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
controller "sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
Expand Down Expand Up @@ -229,7 +231,7 @@ func (d *defaultDriver) Reconcile() *reconciler.Results {
}

// set an annotation with the ClusterUUID, if bootstrapped
if err := ReconcileClusterUUID(d.Client, &d.ES, observedState); err != nil {
if err := bootstrap.ReconcileClusterUUID(d.Client, &d.ES, observedState); err != nil {
return results.WithError(err)
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,22 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithResult(defaultRequeue)
}

// Update Zen1 minimum master nodes through the API, corresponding to the current nodes we have.
// Maybe update Zen1 minimum master nodes through the API, corresponding to the current nodes we have.
requeue, err := zen1.UpdateMinimumMasterNodes(d.Client, d.ES, esClient, actualStatefulSets)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithResult(defaultRequeue)
}
// Remove the zen2 bootstrap annotation if bootstrap is over.
requeue, err = zen2.RemoveZen2BootstrapAnnotation(d.Client, d.ES, esClient)
if err != nil {
return results.WithError(err)
}
if requeue {
results.WithResult(defaultRequeue)
}
// Maybe clear zen2 voting config exclusions.
requeue, err = zen2.ClearVotingConfigExclusions(d.ES, d.Client, esClient, actualStatefulSets)
if err != nil {
Expand Down
Loading