Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto-scaling calculation based by CPU load #1722

Merged
merged 20 commits into from
Feb 19, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/calculate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"encoding/json"
"fmt"
"math"
"net/http"
"strconv"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
promClient "github.com/prometheus/client_golang/api"
"k8s.io/apimachinery/pkg/util/sets"
)

const (
TikvSumCpuMetricsPattern = `sum(tikv_thread_cpu_seconds_total{cluster="%s"}) by (instance)`
TidbSumCpuMetricsPattern = `sum(process_cpu_seconds_total{cluster="%s",job="tidb"}) by (instance)`
InvalidTacMetricConfigureMsg = "tac[%s/%s] metric configure invalid"
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
CpuSumMetricsErrorMsg = "tac[%s/%s] cpu sum metrics error,can't calculate past %s cpu metrics,might caused by prometheus restart with no persistence"
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
queryPath = "/api/v1/query"

float64EqualityThreshold = 1e-9
)

func queryMetricsFromPrometheus(tac *v1alpha1.TidbClusterAutoScaler,
query string, client promClient.Client, timestamp int64, resp *Response) error {
req, err := http.NewRequest("GET", fmt.Sprintf("%s%s", *tac.Spec.MetricsUrl, queryPath), nil)
if err != nil {
return err
}
q := req.URL.Query()
q.Add("query", query)
q.Add("time", fmt.Sprintf("%d", timestamp))
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
req.URL.RawQuery = q.Encode()
r, body, err := client.Do(req.Context(), req)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
if r.StatusCode != http.StatusOK {
return fmt.Errorf("tac[%s/%s]' query error,status code:%d", tac.Namespace, tac.Name, r.StatusCode)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}
err = json.Unmarshal(body, resp)
if err != nil {
return err
}
if resp.Status != statusSuccess {
return fmt.Errorf("tac[%s/%s]' query error, response stataus:%v", tac.Namespace, tac.Name, resp.Status)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// sumByInstanceFromResponse sum the value in Response of each instance from Prometheus
func sumByInstanceFromResponse(instances []string, resp *Response) (float64, error) {
s := sets.String{}
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
for _, instance := range instances {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
s.Insert(instance)
}
sum := 0.0
for _, r := range resp.Data.Result {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
if s.Has(r.Metric.Instance) {
v, err := strconv.ParseFloat(r.Value[1].(string), 64)
if err != nil {
return 0.0, err
}
sum = sum + v
}
}
return sum, nil
}

// calculate func calculate the recommended replicas by given usageRadio and currentReplicas
func calculate(currentValue float64, targetValue float64, currentReplicas int32) (int32, error) {
if almostEqual(targetValue, 0.0) {
return 0, fmt.Errorf("targetValue in calculate func can't be zero")
}
usageRadio := currentValue / targetValue
return int32(math.Ceil(usageRadio * float64(currentReplicas))), nil
}

func almostEqual(a, b float64) bool {
return math.Abs(a-b) <= float64EqualityThreshold
}
85 changes: 85 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"fmt"
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
promClient "github.com/prometheus/client_golang/api"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
)

//TODO: create issue to explain how auto-scaling algorithm based by cpu metrics work
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
func CalculateCpuMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet,
client promClient.Client, instances []string, metric autoscalingv2beta2.MetricSpec,
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
queryPattern, timeWindow string, memberType v1alpha1.MemberType) (int32, error) {
if metric.Resource == nil || metric.Resource.Target.AverageUtilization == nil {
return 0, fmt.Errorf(InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
}
currentReplicas := len(instances)
c, err := filterContainer(tac, sts, memberType.String())
if err != nil {
return 0, err
}
cpuRequestsRadio, err := extractCpuRequestsRadio(c)
if err != nil {
return 0, err
}
now := time.Now()
duration, err := time.ParseDuration(timeWindow)
if err != nil {
return 0, err
}
prvious := now.Truncate(duration)
r := &Response{}
err = queryMetricsFromPrometheus(tac, fmt.Sprintf(queryPattern, tac.Spec.Cluster.Name), client, now.Unix(), r)
if err != nil {
return 0, err
}
sum1, err := sumByInstanceFromResponse(instances, r)
if err != nil {
return 0, err
}
err = queryMetricsFromPrometheus(tac, fmt.Sprintf(queryPattern, tac.Spec.Cluster.Name), client, prvious.Unix(), r)
if err != nil {
return 0, err
}
sum2, err := sumByInstanceFromResponse(instances, r)
if err != nil {
return 0, err
}
if sum1-sum2 < 0 {
return 0, fmt.Errorf(CpuSumMetricsErrorMsg, tac.Namespace, tac.Name, timeWindow)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This algorithm be largely affected by prometheus counter reset (e.g. process restart), an alternative is the prometheus increase func

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

cpuSecsTotal := sum1 - sum2
durationSeconds := duration.Seconds()
utilizationRadio := float64(*metric.Resource.Target.AverageUtilization) / 100.0
expectedCpuSecsTotal := cpuRequestsRadio * durationSeconds * float64(currentReplicas) * utilizationRadio
rc, err := calculate(cpuSecsTotal, expectedCpuSecsTotal, int32(currentReplicas))
if err != nil {
return 0, err
}
return rc, nil
}

func extractCpuRequestsRadio(c *corev1.Container) (float64, error) {
if c.Resources.Requests.Cpu() == nil || c.Resources.Requests.Cpu().MilliValue() < 1 {
return 0, fmt.Errorf("container[%s] cpu requests is empty", c.Name)
}
return float64(c.Resources.Requests.Cpu().MilliValue()) / 1000.0, nil
}
89 changes: 89 additions & 0 deletions pkg/autoscaler/autoscaler/calculate/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package calculate

import (
"fmt"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
)

// currently, we only choose one metrics to be computed.
// If there exists several metrics, we tend to choose ResourceMetricSourceType metric
func FilterMetrics(metrics []autoscalingv2beta2.MetricSpec) autoscalingv2beta2.MetricSpec {
for _, m := range metrics {
if m.Type == autoscalingv2beta2.ResourceMetricSourceType && m.Resource != nil {
return m
}
}
return metrics[0]
}

// MetricType describe the current Supported Metric Type to calculate the recommended Replicas
type MetricType string
Yisaer marked this conversation as resolved.
Show resolved Hide resolved

const (
MetricTypeCPU MetricType = "cpu"
//metricTypeQPS MetricType = "qps"
)

// genMetricType return the supported MetricType in Operator by kubernetes auto-scaling MetricType
func GenMetricType(tac *v1alpha1.TidbClusterAutoScaler, metric autoscalingv2beta2.MetricSpec) (MetricType, error) {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
if metric.Type == autoscalingv2beta2.ResourceMetricSourceType && metric.Resource != nil && metric.Resource.Name == corev1.ResourceCPU {
return MetricTypeCPU, nil
}
return "", fmt.Errorf(InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
}

// filterContainer is to filter the specific container from the given statefulset(tidb/tikv)
func filterContainer(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, containerName string) (*corev1.Container, error) {
for _, c := range sts.Spec.Template.Spec.Containers {
if c.Name == containerName {
return &c, nil
}
}
return nil, fmt.Errorf("tac[%s/%s]'s Target have not %s container", tac.Namespace, tac.Name, containerName)
}

const (
statusSuccess = "success"
)

// Response is used to marshal the data queried from Prometheus
type Response struct {
Status string `json:"status"`
Data Data `json:"data"`
}

type Data struct {
ResultType string `json:"resultType"`
Result []Result `json:"result"`
}

type Result struct {
Metric Metric `json:"metric"`
Value []interface{} `json:"value"`
}

type Metric struct {
Cluster string `json:"cluster,omitempty"`
Instance string `json:"instance"`
Job string `json:"job,omitempty"`
KubernetesNamespace string `json:"kubernetes_namespace,omitempty"`
KubernetesNode string `json:"kubernetes_node,omitempty"`
KubernetesPodIp string `json:"kubernetes_pod_ip,omitempty"`
}
31 changes: 30 additions & 1 deletion pkg/autoscaler/autoscaler/tidb_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
package autoscaler

import (
"fmt"
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/autoscaler/autoscaler/calculate"
"github.com/pingcap/tidb-operator/pkg/label"
operatorUtils "github.com/pingcap/tidb-operator/pkg/util"
promClient "github.com/prometheus/client_golang/api"
appsv1 "k8s.io/api/apps/v1"
)

func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.TidbClusterAutoScaler, client promClient.Client) error {
Expand All @@ -36,7 +39,11 @@ func (am *autoScalerManager) syncTiDB(tc *v1alpha1.TidbCluster, tac *v1alpha1.Ti
return nil
}
currentReplicas := tc.Spec.TiDB.Replicas
targetReplicas := calculateRecommendedReplicas(tac, v1alpha1.TiDBMemberType, client)
instances := filterTidbInstances(tc)
targetReplicas, err := calculateTidbMetrics(tac, sts, client, instances)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the entrance function for TiDB to calculate auto-scaling by each metric.

if err != nil {
return err
}
targetReplicas = limitTargetReplicas(targetReplicas, tac, v1alpha1.TiDBMemberType)
if targetReplicas == tc.Spec.TiDB.Replicas {
emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType)
Expand Down Expand Up @@ -80,3 +87,25 @@ func updateTcTiDBAnnIfScale(tac *v1alpha1.TidbClusterAutoScaler) {
tac.Annotations[label.AnnTiDBLastAutoScalingTimestamp] = time.Now().String()
emptyAutoScalingCountAnn(tac, v1alpha1.TiDBMemberType)
}

func calculateTidbMetrics(tac *v1alpha1.TidbClusterAutoScaler, sts *appsv1.StatefulSet, client promClient.Client, instances []string) (int32, error) {
metric := calculate.FilterMetrics(tac.Spec.TiDB.Metrics)
mType, err := calculate.GenMetricType(tac, metric)
if err != nil {
return 0, err
}
switch mType {
case calculate.MetricTypeCPU:
return calculate.CalculateCpuMetrics(tac, sts, client, instances, metric, calculate.TidbSumCpuMetricsPattern, *tac.Spec.TiDB.MetricsTimeDuration, v1alpha1.TiDBMemberType)
default:
return 0, fmt.Errorf(calculate.InvalidTacMetricConfigureMsg, tac.Namespace, tac.Name)
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
}
}

func filterTidbInstances(tc *v1alpha1.TidbCluster) []string {
var instances []string
for i := 0; int32(i) < tc.Spec.TiDB.Replicas; i++ {
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should take failover into account

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

instances = append(instances, operatorUtils.GetPodName(tc, v1alpha1.TiDBMemberType, int32(i)))
}
return instances
}
1 change: 1 addition & 0 deletions pkg/autoscaler/autoscaler/tidb_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestSyncTiDBAfterCalculated(t *testing.T) {
tc.Spec.TiDB.Replicas = test.currentReplicas
tac.Annotations[label.AnnTiDBConsecutiveScaleInCount] = fmt.Sprintf("%d", test.currentScaleInCount)
tac.Annotations[label.AnnTiDBConsecutiveScaleOutCount] = fmt.Sprintf("%d", test.currentScaleOutCount)
tac.Spec.TiKV = nil

err := syncTiDBAfterCalculated(tc, tac, test.currentReplicas, test.recommendedReplicas)
g.Expect(err).ShouldNot(HaveOccurred())
Expand Down
Loading