Skip to content

Commit

Permalink
grpc client: wait properly for establishing a connection (kedacore#3938)
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>

Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
zroubalik committed Dec 1, 2022
1 parent e78e112 commit dec8d9c
Show file tree
Hide file tree
Showing 85 changed files with 676 additions and 1,104 deletions.
File renamed without changes.
12 changes: 11 additions & 1 deletion apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ type ScaleTarget struct {
type ScaleTriggers struct {
Type string `json:"type"`
// +optional
Name string `json:"name,omitempty"`
Name string `json:"name,omitempty"`
// +optional
CacheDuration *int32 `json:"cacheDuration,omitempty"`

EnableCache bool `json:"enableCache,omitempty"`

Metadata map[string]string `json:"metadata"`
// +optional
AuthenticationRef *ScaledObjectAuthRef `json:"authenticationRef,omitempty"`
Expand Down Expand Up @@ -179,3 +184,8 @@ type ScaledObjectAuthRef struct {
func init() {
SchemeBuilder.Register(&ScaledObject{}, &ScaledObjectList{})
}

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (s *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier(s.Kind, s.Namespace, s.Name)
}
5 changes: 5 additions & 0 deletions apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7861,6 +7861,11 @@ spec:
required:
- name
type: object
cacheDuration:
format: int32
type: integer
enableCache:
type: boolean
metadata:
additionalProperties:
type: string
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ spec:
required:
- name
type: object
cacheDuration:
format: int32
type: integer
enableCache:
type: boolean
metadata:
additionalProperties:
type: string
Expand Down
1 change: 0 additions & 1 deletion controllers/keda/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc
return scaler, &scalers.ScalerConfig{}, nil
},
}},
Logger: logr.Discard(),
Recorder: nil,
}
metricSpec := v2.MetricSpec{
Expand Down
34 changes: 17 additions & 17 deletions pkg/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -107,7 +107,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand All @@ -117,21 +117,21 @@ var _ = Describe("fallback", func() {
})

It("should propagate the error when fallback is disabled", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))

so := buildScaledObject(nil, nil)
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
})

It("should bump the number of failures when metrics call fails", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(0)

so := buildScaledObject(
Expand All @@ -152,7 +152,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expand All @@ -161,7 +161,7 @@ var _ = Describe("fallback", func() {
})

It("should return a normalised metric when number of failures are beyond threshold", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
expectedMetricValue := int64(100)

Expand All @@ -182,7 +182,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -214,7 +214,7 @@ var _ = Describe("fallback", func() {
})

It("should ignore error if we fail to update kubernetes status", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
expectedMetricValue := int64(100)

Expand All @@ -238,7 +238,7 @@ var _ = Describe("fallback", func() {
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("Some error"))
client.EXPECT().Status().Return(statusWriter)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
Expand All @@ -248,7 +248,7 @@ var _ = Describe("fallback", func() {
})

It("should return error when fallback is enabled but scaledobject has invalid parameter", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)

so := buildScaledObject(
Expand All @@ -268,15 +268,15 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)

Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
})

It("should set the fallback condition when a fallback exists in the scaled object", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
failingNumberOfFailures := int32(6)
anotherMetricName := "another metric name"
Expand All @@ -302,15 +302,15 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)
Expect(err).ToNot(HaveOccurred())
condition := so.Status.Conditions.GetFallbackCondition()
Expect(condition.IsTrue()).Should(BeTrue())
})

It("should set the fallback condition to false if the config is invalid", func() {
scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return(nil, errors.New("Some error"))
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("Some error"))
startingNumberOfFailures := int32(3)
failingNumberOfFailures := int32(6)
anotherMetricName := "another metric name"
Expand All @@ -336,7 +336,7 @@ var _ = Describe("fallback", func() {
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, err := scaler.GetMetrics(context.Background(), metricName)
metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, err = GetMetricsWithFallback(context.Background(), client, logger, metrics, err, metricName, so, metricSpec)
Expect(err).ShouldNot(BeNil())
Expect(err.Error()).Should(Equal("Some error"))
Expand Down Expand Up @@ -425,7 +425,7 @@ func primeGetMetrics(scaler *mock_scalers.MockScaler, value int64) {
Timestamp: metav1.Now(),
}

scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName)).Return([]external_metrics.ExternalMetricValue{expectedMetric}, nil)
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return([]external_metrics.ExternalMetricValue{expectedMetric}, true, nil)
}

func createMetricSpec(averageValue int) v2.MetricSpec {
Expand Down
31 changes: 29 additions & 2 deletions pkg/metricsservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package metricsservice
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/metrics/pkg/apis/external_metrics"
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
Expand All @@ -29,7 +32,8 @@ import (
)

type GrpcClient struct {
client api.MetricsServiceClient
client api.MetricsServiceClient
connection *grpc.ClientConn
}

func NewGrpcClient(url string) (*GrpcClient, error) {
Expand All @@ -51,7 +55,7 @@ func NewGrpcClient(url string) (*GrpcClient, error) {
return nil, err
}

return &GrpcClient{client: api.NewMetricsServiceClient(conn)}, nil
return &GrpcClient{client: api.NewMetricsServiceClient(conn), connection: conn}, nil
}

func (c *GrpcClient) GetMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, *api.PromMetricsMsg, error) {
Expand All @@ -68,3 +72,26 @@ func (c *GrpcClient) GetMetrics(ctx context.Context, scaledObjectName, scaledObj

return extMetrics, response.GetPromMetrics(), nil
}

// WaitForConnectionReady waits for gRPC connection to be ready
// returns true if the connection was successful, false if we hit a timeut from context
func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, logger logr.Logger) bool {
currentState := c.connection.GetState()
if currentState != connectivity.Ready {
logger.Info("Waiting for establishing a gRPC connection to KEDA Metrics Server")
for {
select {
case <-ctx.Done():
return false
default:
c.connection.Connect()
time.Sleep(500 * time.Millisecond)
currentState := c.connection.GetState()
if currentState == connectivity.Ready {
return true
}
}
}
}
return true
}
6 changes: 3 additions & 3 deletions pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (

"google.golang.org/grpc"
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kedacore/keda/v2/pkg/metricsservice/api"
"github.com/kedacore/keda/v2/pkg/scaling"
)

var log = ctrl.Log.WithName("grpc_server")
var log = logf.Log.WithName("grpc_server")

type GrpcServer struct {
server *grpc.Server
Expand All @@ -41,7 +41,7 @@ type GrpcServer struct {
// GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject reference
func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*api.Response, error) {
v1beta1ExtMetrics := &v1beta1.ExternalMetricValueList{}
extMetrics, exportedMetrics, err := (*s.scalerHandler).GetExternalMetrics(ctx, in.Name, in.Namespace, in.MetricName)
extMetrics, exportedMetrics, err := (*s.scalerHandler).GetScaledObjectMetrics(ctx, in.Name, in.Namespace, in.MetricName)
if err != nil {
return nil, fmt.Errorf("error when getting metric values %s", err)
}
Expand Down
64 changes: 18 additions & 46 deletions pkg/mock/mock_scaler/mock_scaler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit dec8d9c

Please sign in to comment.