Skip to content

Commit

Permalink
grpc client: wait properly for establishing a connection (#3938)
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>

Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
zroubalik authored Nov 30, 2022
1 parent e78e112 commit 0f21eac
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
File renamed without changes.
31 changes: 29 additions & 2 deletions pkg/metricsservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package metricsservice
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/metrics/pkg/apis/external_metrics"
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
Expand All @@ -29,7 +32,8 @@ import (
)

type GrpcClient struct {
client api.MetricsServiceClient
client api.MetricsServiceClient
connection *grpc.ClientConn
}

func NewGrpcClient(url string) (*GrpcClient, error) {
Expand All @@ -51,7 +55,7 @@ func NewGrpcClient(url string) (*GrpcClient, error) {
return nil, err
}

return &GrpcClient{client: api.NewMetricsServiceClient(conn)}, nil
return &GrpcClient{client: api.NewMetricsServiceClient(conn), connection: conn}, nil
}

func (c *GrpcClient) GetMetrics(ctx context.Context, scaledObjectName, scaledObjectNamespace, metricName string) (*external_metrics.ExternalMetricValueList, *api.PromMetricsMsg, error) {
Expand All @@ -68,3 +72,26 @@ func (c *GrpcClient) GetMetrics(ctx context.Context, scaledObjectName, scaledObj

return extMetrics, response.GetPromMetrics(), nil
}

// WaitForConnectionReady waits for gRPC connection to be ready
// returns true if the connection was successful, false if we hit a timeut from context
func (c *GrpcClient) WaitForConnectionReady(ctx context.Context, logger logr.Logger) bool {
currentState := c.connection.GetState()
if currentState != connectivity.Ready {
logger.Info("Waiting for establishing a gRPC connection to KEDA Metrics Server")
for {
select {
case <-ctx.Done():
return false
default:
c.connection.Connect()
time.Sleep(500 * time.Millisecond)
currentState := c.connection.GetState()
if currentState == connectivity.Ready {
return true
}
}
}
}
return true
}
33 changes: 20 additions & 13 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,34 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,

// Get Metrics from Metrics Service gRPC Server
if p.useMetricsServiceGrpc {
if !p.grpcClient.WaitForConnectionReady(ctx, logger) {
logger.Error(fmt.Errorf("timeout while waiting to establish gRPC connection to KEDA Metrics Server"), "timeout")
return nil, err
}

// selector is in form: `scaledobject.keda.sh/name: scaledobject-name`
scaledObjectName := selector.Get("scaledobject.keda.sh/name")

metrics, promMetrics, err := p.grpcClient.GetMetrics(ctx, scaledObjectName, namespace, info.Metric)
logger.V(1).WithValues("scaledObjectName", scaledObjectName, "scaledObjectNamespace", namespace, "metrics", metrics).Info("Receiving metrics")

// [DEPRECATED] handle exporting Prometheus metrics from Operator to Metrics Server
var scaledObjectErr error
if promMetrics.ScaledObjectErr {
scaledObjectErr = fmt.Errorf("scaledObject error")
}
promMetricsServer.RecordScaledObjectError(namespace, scaledObjectName, scaledObjectErr)
for _, scalerMetric := range promMetrics.ScalerMetric {
promMetricsServer.RecordHPAScalerMetric(namespace, scaledObjectName, scalerMetric.ScalerName, int(scalerMetric.ScalerIndex), scalerMetric.MetricName, float64(scalerMetric.MetricValue))
}
for _, scalerError := range promMetrics.ScalerError {
var scalerErr error
if scalerError.Error {
scalerErr = fmt.Errorf("scaler error")
if promMetrics != nil {
var scaledObjectErr error
if promMetrics.ScaledObjectErr {
scaledObjectErr = fmt.Errorf("scaledObject error")
}
promMetricsServer.RecordScaledObjectError(namespace, scaledObjectName, scaledObjectErr)
for _, scalerMetric := range promMetrics.ScalerMetric {
promMetricsServer.RecordHPAScalerMetric(namespace, scaledObjectName, scalerMetric.ScalerName, int(scalerMetric.ScalerIndex), scalerMetric.MetricName, float64(scalerMetric.MetricValue))
}
for _, scalerError := range promMetrics.ScalerError {
var scalerErr error
if scalerError.Error {
scalerErr = fmt.Errorf("scaler error")
}
promMetricsServer.RecordHPAScalerError(namespace, scaledObjectName, scalerError.ScalerName, int(scalerError.ScalerIndex), scalerError.MetricName, scalerErr)
}
promMetricsServer.RecordHPAScalerError(namespace, scaledObjectName, scalerError.ScalerName, int(scalerError.ScalerIndex), scalerError.MetricName, scalerErr)
}

return metrics, err
Expand Down

0 comments on commit 0f21eac

Please sign in to comment.