Skip to content

Commit

Permalink
Merge pull request #683 from vsliouniaev/main
Browse files Browse the repository at this point in the history
Allow setting a cluster offline
  • Loading branch information
shunki-fujita authored Jun 20, 2024
2 parents 883b108 + 4cc0ec8 commit 11adadd
Show file tree
Hide file tree
Showing 23 changed files with 260 additions and 138 deletions.
4 changes: 4 additions & 0 deletions api/v1beta2/mysqlcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ type MySQLClusterSpec struct {
// During container init moco-agent will set mysql admin interface is bound to localhost. The moco-agent will also
// communicate with mysqld over localhost when acting as a sidecar.
AgentUseLocalhost bool `json:"agentUseLocalhost,omitempty"`

// Offline sets the cluster offline, releasing compute resources. Data is not removed.
// +optional
Offline bool `json:"offline,omitempty"`
}

func (s MySQLClusterSpec) validateCreate() (admission.Warnings, field.ErrorList) {
Expand Down
4 changes: 4 additions & 0 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (bm *BackupManager) Backup(ctx context.Context) error {
return fmt.Errorf("failed to get pod list: %w", err)
}

if bm.cluster.Spec.Offline {
return fmt.Errorf("cluster is configured to be offline %s/%s", bm.cluster.Namespace, bm.cluster.Name)
}

if len(pods.Items) != int(bm.cluster.Spec.Replicas) {
return fmt.Errorf("too few Pods for %s/%s", bm.cluster.Namespace, bm.cluster.Name)
}
Expand Down
3 changes: 3 additions & 0 deletions charts/moco/templates/generated/crds/moco_crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2244,6 +2244,9 @@ spec:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
type: string
offline:
description: Offline sets the cluster offline, releasing comput
type: boolean
podTemplate:
description: PodTemplate is a `Pod` template for MySQL server c
properties:
Expand Down
5 changes: 4 additions & 1 deletion clustering/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func (p *managerProcess) do(ctx context.Context) (bool, error) {

logFromContext(ctx).Info("cluster state is " + ss.State.String())
switch ss.State {
case StateOffline:
return false, nil
case StateCloning:
if p.isCloning(ctx, ss) {
return false, nil
Expand Down Expand Up @@ -281,6 +283,7 @@ func (p *managerProcess) updateStatus(ctx context.Context, ss *StatusSet) error
case StateFailed:
case StateLost:
case StateIncomplete:
case StateOffline:
}

meta.SetStatusCondition(&cluster.Status.Conditions, updateCond(mocov1beta2.ConditionInitialized, initialized))
Expand Down Expand Up @@ -309,7 +312,7 @@ func (p *managerProcess) updateStatus(ctx context.Context, ss *StatusSet) error

var syncedReplicas int
for _, pod := range ss.Pods {
if isPodReady(pod) {
if pod != nil && isPodReady(pod) {
syncedReplicas++
}
}
Expand Down
11 changes: 10 additions & 1 deletion clustering/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
StateDegraded
StateFailed
StateLost
StateOffline
)

// String returns a unique string for each ClusterState.
Expand All @@ -71,6 +72,8 @@ func (s ClusterState) String() string {
return "Failed"
case StateLost:
return "Lost"
case StateOffline:
return "Offline"
}

panic(int(s))
Expand Down Expand Up @@ -107,6 +110,8 @@ func (ss *StatusSet) Close() {
// It may also set `ss.NeedSwitch` and `ss.Candidate` for switchover.
func (ss *StatusSet) DecideState() {
switch {
case isOffline(ss):
ss.State = StateOffline
case isCloning(ss):
ss.State = StateCloning
case isRestoring(ss):
Expand Down Expand Up @@ -160,7 +165,7 @@ func (p *managerProcess) GatherStatus(ctx context.Context) (*StatusSet, error) {
return nil, fmt.Errorf("failed to list Pods: %w", err)
}

if int(cluster.Spec.Replicas) != len(pods.Items) {
if !cluster.Spec.Offline && int(cluster.Spec.Replicas) != len(pods.Items) {
return nil, fmt.Errorf("too few pods; only %d pods exist", len(pods.Items))
}
ss.Pods = make([]*corev1.Pod, cluster.Spec.Replicas)
Expand Down Expand Up @@ -547,6 +552,10 @@ func isLost(ss *StatusSet) bool {
return okReplicas <= (int(ss.Cluster.Spec.Replicas) / 2)
}

func isOffline(ss *StatusSet) bool {
return ss.Cluster.Spec.Offline
}

func needSwitch(pod *corev1.Pod) bool {
if pod.DeletionTimestamp != nil {
return true
Expand Down
4 changes: 4 additions & 0 deletions cmd/kubectl-moco/cmd/switchover.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func switchover(ctx context.Context, name string) error {
return err
}

if cluster.Spec.Offline {
return errors.New("offline cluster is not able to switch")
}

if cluster.Spec.Replicas == 1 {
return errors.New("single-instance cluster is not able to switch")
}
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/moco.cybozu.com_mysqlclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ spec:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
type: string
offline:
description: Offline sets the cluster offline, releasing comput
type: boolean
podTemplate:
description: PodTemplate is a `Pod` template for MySQL server c
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ spec:
description: 'MySQLConfigMapName is a `ConfigMap` name of MySQL '
nullable: true
type: string
offline:
description: Offline sets the cluster offline, releasing comput
type: boolean
podTemplate:
description: PodTemplate is a `Pod` template for MySQL server c
properties:
Expand Down
8 changes: 6 additions & 2 deletions controllers/mysqlcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,10 +705,14 @@ func (r *MySQLClusterReconciler) reconcileV1StatefulSet(ctx context.Context, req
return fmt.Errorf("failed to get StatefulSet %s/%s: %w", cluster.Namespace, cluster.PrefixedName(), err)
}

replicas := cluster.Spec.Replicas
if cluster.Spec.Offline {
replicas = 0
}
sts := appsv1ac.StatefulSet(cluster.PrefixedName(), cluster.Namespace).
WithLabels(labelSet(cluster, false)).
WithSpec(appsv1ac.StatefulSetSpec().
WithReplicas(cluster.Spec.Replicas).
WithReplicas(replicas).
WithSelector(metav1ac.LabelSelector().
WithMatchLabels(labelSet(cluster, false))).
WithPodManagementPolicy(appsv1.ParallelPodManagement).
Expand Down Expand Up @@ -972,7 +976,7 @@ func (r *MySQLClusterReconciler) reconcileV1PDB(ctx context.Context, req ctrl.Re
pdb.Namespace = cluster.Namespace
pdb.Name = cluster.PrefixedName()

if cluster.Spec.Replicas < 3 {
if cluster.Spec.Offline || cluster.Spec.Replicas < 3 {
err := r.Delete(ctx, pdb)
if err == nil {
log.Info("removed pod disruption budget")
Expand Down
41 changes: 40 additions & 1 deletion controllers/mysqlcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,45 @@ var _ = Describe("MySQLCluster reconciler", func() {
}).Should(Succeed())
})

It("should scale down statefulset when offline", func() {
cluster := testNewMySQLCluster("test")
err := k8sClient.Create(ctx, cluster)
Expect(err).NotTo(HaveOccurred())

Eventually(func() error {
sts := &appsv1.StatefulSet{}
if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: "moco-test"}, sts); err != nil {
return err
}
if sts.Spec.Replicas == nil || *sts.Spec.Replicas != cluster.Spec.Replicas {
return fmt.Errorf("replica count should match cluster")
}
return nil
}).Should(Succeed())

By("setting cluster offline")
Eventually(func() error {
cluster2 := &mocov1beta2.MySQLCluster{}
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(cluster), cluster2); err != nil {
return err
}
cluster2.Spec.Offline = true
return k8sClient.Update(ctx, cluster2)
}).Should(Succeed())

By("checking statefulset is scaled down")
Eventually(func() error {
sts := &appsv1.StatefulSet{}
if err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "test", Name: "moco-test"}, sts); err != nil {
return err
}
if sts.Spec.Replicas == nil || *sts.Spec.Replicas != 0 {
return fmt.Errorf("replica count should be 0 for offline cluster")
}
return nil
}).Should(Succeed())
})

It("should sets ConditionStatefulSetReady to be true when StatefulSet is ready", func() {
cluster := testNewMySQLCluster("test")
err := k8sClient.Create(ctx, cluster)
Expand Down Expand Up @@ -1754,7 +1793,7 @@ var _ = Describe("MySQLCluster reconciler", func() {
return fmt.Errorf("condition does not exists")
}
if conditionStatefulSetReady.Status != metav1.ConditionTrue {
return fmt.Errorf("condition is not false")
return fmt.Errorf("condition is not true")
}
return nil
}).Should(Succeed())
Expand Down
1 change: 1 addition & 0 deletions docs/crd_mysqlcluster_v1beta2.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ MySQLClusterSpec defines the desired state of MySQLCluster
| restore | Restore is the specification to perform Point-in-Time-Recovery from existing cluster. If this field is not null, MOCO restores the data as specified and create a new cluster with the data. This field is not editable. | *[RestoreSpec](#restorespec) | false |
| disableSlowQueryLogContainer | DisableSlowQueryLogContainer controls whether to add a sidecar container named \"slow-log\" to output slow logs as the containers output. If set to true, the sidecar container is not added. The default is false. | bool | false |
| agentUseLocalhost | AgentUseLocalhost configures the mysqld interface to bind and be accessed over localhost instead of pod name. During container init moco-agent will set mysql admin interface is bound to localhost. The moco-agent will also communicate with mysqld over localhost when acting as a sidecar. | bool | false |
| offline | Offline sets the cluster offline, releasing compute resources. Data is not removed. | bool | false |

[Back to Custom Resources](#custom-resources)

Expand Down
10 changes: 1 addition & 9 deletions e2e/backup_with_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,6 @@ var _ = Context("backup with ObjectBucketName is set in environments variables",

It("should delete clusters", func() {
kubectlSafe(nil, "delete", "-n", "backup", "mysqlclusters", "--all")

Eventually(func(g Gomega) {
out, err := kubectl(nil, "get", "-n", "backup", "pod", "-o", "json")
g.Expect(err).NotTo(HaveOccurred())
pods := &corev1.PodList{}
err = json.Unmarshal(out, pods)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(len(pods.Items)).To(BeNumerically(">", 0), "wait until all Pods are deleted")
}).Should(Succeed())
verifyAllPodsDeleted("backup")
})
})
18 changes: 1 addition & 17 deletions e2e/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package e2e

import (
_ "embed"
"encoding/json"
"errors"
"fmt"

mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -138,20 +136,6 @@ var _ = Context("failure", func() {

It("should delete clusters", func() {
kubectlSafe(nil, "delete", "-n", "failover", "mysqlclusters", "--all")

Eventually(func() error {
out, err := kubectl(nil, "get", "-n", "failover", "pod", "-o", "json")
if err != nil {
return err
}
pods := &corev1.PodList{}
if err := json.Unmarshal(out, pods); err != nil {
return err
}
if len(pods.Items) > 0 {
return errors.New("wait until all Pods are deleted")
}
return nil
}).Should(Succeed())
verifyAllPodsDeleted("failover")
})
})
18 changes: 1 addition & 17 deletions e2e/failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package e2e
import (
"context"
_ "embed"
"encoding/json"
"errors"
"fmt"
"sync"
Expand All @@ -12,7 +11,6 @@ import (
mocov1beta2 "github.com/cybozu-go/moco/api/v1beta2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -98,20 +96,6 @@ var _ = Context("failure", func() {

It("should delete clusters", func() {
kubectlSafe(nil, "delete", "-n", "failure", "mysqlclusters", "--all")

Eventually(func() error {
out, err := kubectl(nil, "get", "-n", "failure", "pod", "-o", "json")
if err != nil {
return err
}
pods := &corev1.PodList{}
if err := json.Unmarshal(out, pods); err != nil {
return err
}
if len(pods.Items) > 0 {
return errors.New("wait until all Pods are deleted")
}
return nil
}).Should(Succeed())
verifyAllPodsDeleted("failure")
})
})
17 changes: 1 addition & 16 deletions e2e/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
_ "embed"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -284,20 +283,6 @@ var _ = Context("lifecycle", func() {

It("should delete clusters", func() {
kubectlSafe(nil, "delete", "-n", "foo", "mysqlclusters", "--all")

Eventually(func() error {
out, err := kubectl(nil, "get", "-n", "foo", "pod", "-o", "json")
if err != nil {
return err
}
pods := &corev1.PodList{}
if err := json.Unmarshal(out, pods); err != nil {
return err
}
if len(pods.Items) > 0 {
return errors.New("wait until all Pods are deleted")
}
return nil
}).Should(Succeed())
verifyAllPodsDeleted("foo")
})
})
Loading

0 comments on commit 11adadd

Please sign in to comment.