Skip to content

Commit

Permalink
chore: improve cluster conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei committed Dec 20, 2024
1 parent 274793f commit ade5c88
Showing 1 changed file with 146 additions and 0 deletions.
146 changes: 146 additions & 0 deletions apis/apps/v1alpha1/cluster_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package v1alpha1

import (
"sort"
"strings"

"github.com/jinzhu/copier"
"golang.org/x/exp/maps"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/conversion"
Expand Down Expand Up @@ -111,11 +115,62 @@ func (r *Cluster) changesToCluster(cluster *appsv1.Cluster) {
// status
// components
// - message: ComponentMessageMap -> map[string]string
if len(r.Spec.ClusterDefRef) > 0 {
cluster.Spec.ClusterDef = r.Spec.ClusterDefRef
}

if r.Spec.TerminationPolicy == Halt {
cluster.Spec.TerminationPolicy = appsv1.DoNotTerminate
} else {
cluster.Spec.TerminationPolicy = appsv1.TerminationPolicyType(r.Spec.TerminationPolicy)
}

r.toClusterServices(cluster)

if cluster.Spec.SchedulingPolicy == nil {
var componentName string
if len(r.Spec.ComponentSpecs) > 0 {
componentName = r.Spec.ComponentSpecs[0].Name
}
// TODO: support convert the schedulingPolicy of the shardings
cluster.Spec.SchedulingPolicy = r.toSchedulingPolicy(r.Spec.Affinity, r.Spec.Tolerations, componentName)
}

for i := range r.Spec.ComponentSpecs {
compSpec := r.Spec.ComponentSpecs[i]
r.toComponentSpec(compSpec, &cluster.Spec.ComponentSpecs[i], compSpec.Name)
}
// TODO: support convert the shardings
}

func (r *Cluster) toClusterServices(cluster *appsv1.Cluster) {
for i := range r.Spec.Services {
clusterSVC := r.Spec.Services[i]
if len(clusterSVC.ShardingSelector) > 0 && len(clusterSVC.ComponentSelector) == 0 {
cluster.Spec.Services[i].ComponentSelector = clusterSVC.ShardingSelector
}
}
}

func (r *Cluster) toSchedulingPolicy(affinity *Affinity, tolerations []corev1.Toleration, compName string) *appsv1.SchedulingPolicy {
if len(compName) == 0 {
return nil
}
if affinity == nil && len(tolerations) == 0 {
return nil
}
schedulingPolicy := &appsv1.SchedulingPolicy{}
schedulingPolicy.Tolerations = tolerations

schedulingPolicy.Affinity = convertToAffinity(r.Name, compName, affinity)
schedulingPolicy.TopologySpreadConstraints = convertTopologySpreadConstraints4Legacy(r.Name, compName, affinity)
return schedulingPolicy
}

func (r *Cluster) toComponentSpec(fromCompSpec ClusterComponentSpec, toCompSpec *appsv1.ClusterComponentSpec, componentName string) {
if toCompSpec.SchedulingPolicy == nil {
toCompSpec.SchedulingPolicy = r.toSchedulingPolicy(fromCompSpec.Affinity, fromCompSpec.Tolerations, componentName)
}
}

func (r *Cluster) changesFromCluster(cluster *appsv1.Cluster) {
Expand Down Expand Up @@ -283,3 +338,94 @@ func (c *clusterConverter) toCluster(cluster *Cluster) {
}
}
}

func convertToAffinity(clusterName, compName string, compAffinity *Affinity) *corev1.Affinity {
if compAffinity == nil {
return nil
}
affinity := new(corev1.Affinity)
// Build NodeAffinity
var matchExpressions []corev1.NodeSelectorRequirement
nodeLabelKeys := maps.Keys(compAffinity.NodeLabels)
// NodeLabels must be ordered
sort.Strings(nodeLabelKeys)
for _, key := range nodeLabelKeys {
values := strings.Split(compAffinity.NodeLabels[key], ",")
matchExpressions = append(matchExpressions, corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: values,
})
}
if len(matchExpressions) > 0 {
nodeSelectorTerm := corev1.NodeSelectorTerm{
MatchExpressions: matchExpressions,
}
affinity.NodeAffinity = &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{nodeSelectorTerm},
},
}
}
// Build PodAntiAffinity
var podAntiAffinity *corev1.PodAntiAffinity
var podAffinityTerms []corev1.PodAffinityTerm
for _, topologyKey := range compAffinity.TopologyKeys {
podAffinityTerms = append(podAffinityTerms, corev1.PodAffinityTerm{
TopologyKey: topologyKey,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": clusterName,
"apps.kubeblocks.io/component-name": compName,
},
},
})
}
if compAffinity.PodAntiAffinity == Required {
podAntiAffinity = &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
}
} else {
var weightedPodAffinityTerms []corev1.WeightedPodAffinityTerm
for _, podAffinityTerm := range podAffinityTerms {
weightedPodAffinityTerms = append(weightedPodAffinityTerms, corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: podAffinityTerm,
})
}
podAntiAffinity = &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: weightedPodAffinityTerms,
}
}
affinity.PodAntiAffinity = podAntiAffinity
return affinity
}

func convertTopologySpreadConstraints4Legacy(clusterName, compName string, compAffinity *Affinity) []corev1.TopologySpreadConstraint {
if compAffinity == nil {
return nil
}

var topologySpreadConstraints []corev1.TopologySpreadConstraint

var whenUnsatisfiable corev1.UnsatisfiableConstraintAction
if compAffinity.PodAntiAffinity == Required {
whenUnsatisfiable = corev1.DoNotSchedule
} else {
whenUnsatisfiable = corev1.ScheduleAnyway
}
for _, topologyKey := range compAffinity.TopologyKeys {
topologySpreadConstraints = append(topologySpreadConstraints, corev1.TopologySpreadConstraint{
MaxSkew: 1,
WhenUnsatisfiable: whenUnsatisfiable,
TopologyKey: topologyKey,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/instance": clusterName,
"apps.kubeblocks.io/component-name": compName,
},
},
})
}
return topologySpreadConstraints
}

0 comments on commit ade5c88

Please sign in to comment.