Skip to content

Commit

Permalink
Merge branch 'main' into aliaqel/grpc-request-metrics
Browse files Browse the repository at this point in the history
Signed-off-by: aliaqel-stripe <120822631+aliaqel-stripe@users.noreply.github.com>
  • Loading branch information
aliaqel-stripe authored Mar 26, 2024
2 parents ff8a95a + 3bf5151 commit 8e7ae09
Show file tree
Hide file tree
Showing 9 changed files with 362 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Here is an overview of all new **experimental** features:

- **General**: Add command-line flag in Adapter to allow override of gRPC Authority Header ([#5449](https://github.com/kedacore/keda/issues/5449))
- **General**: Add GRPC Client and Server metrics ([#5502](https://github.com/kedacore/keda/issues/5502))
- **General**: Add GRPC Healthchecks ([#5590](https://github.com/kedacore/keda/issues/5590))
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
Expand All @@ -76,6 +77,7 @@ Here is an overview of all new **experimental** features:
- **General**: Prometheus metrics shows errors correctly ([#5597](https://github.com/kedacore/keda/issues/5597))
- **General**: Validate empty array value of triggers in ScaledObject/ScaledJob creation ([#5520](https://github.com/kedacore/keda/issues/5520))
- **GitHub Runner Scaler**: Fixed `in_progress` detection on running jobs instead of just `queued` ([#5604](https://github.com/kedacore/keda/issues/5604))
- **New Relic Scaler**: Consider empty results set from query executer ([#5619](https://github.com/kedacore/keda/pull/5619))

### Deprecations

Expand Down
2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func main() {
close(certReady)
}

grpcServer := metricsservice.NewGrpcServer(&scaledHandler, metricsServiceAddr, certDir, certReady)
grpcServer := metricsservice.NewGrpcServer(&scaledHandler, metricsServiceAddr, certDir, certReady, mgr.Elected())
if err := mgr.Add(&grpcServer); err != nil {
setupLog.Error(err, "unable to set up Metrics Service gRPC server")
os.Exit(1)
Expand Down
28 changes: 17 additions & 11 deletions pkg/metricsservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,23 @@ type GrpcClient struct {
}

func NewGrpcClient(url, certDir, authority string, clientMetrics *grpcprom.ClientMetrics) (*GrpcClient, error) {
defaultConfig := `{
"methodConfig": [{
"timeout": "3s",
"waitForReady": true,
"retryPolicy": {
"InitialBackoff": ".25s",
"MaxBackoff": "2.0s",
"BackoffMultiplier": 2,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`
defaultConfig := fmt.Sprintf(`{
"methodConfig": [{
"timeout": "3s",
"waitForReady": true,
"retryPolicy": {
"InitialBackoff": ".25s",
"MaxBackoff": "2.0s",
"BackoffMultiplier": 2,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}],
"loadBalancingPolicy": "round_robin",
"healthCheckConfig": {
"serviceName": "%s"
}
}`,
api.MetricsService_ServiceDesc.ServiceName)

creds, err := utils.LoadGrpcTLSCredentials(certDir, false)
if err != nil {
Expand Down
44 changes: 31 additions & 13 deletions pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -35,9 +37,11 @@ var log = logf.Log.WithName("grpc_server")

type GrpcServer struct {
server *grpc.Server
healthServer *health.Server
address string
certDir string
certsReady chan struct{}
elected <-chan struct{}
scalerHandler *scaling.ScaleHandler
api.UnimplementedMetricsServiceServer
}
Expand All @@ -61,12 +65,13 @@ func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*
}

// NewGrpcServer creates a new instance of GrpcServer
func NewGrpcServer(scaleHandler *scaling.ScaleHandler, address, certDir string, certsReady chan struct{}) GrpcServer {
func NewGrpcServer(scaleHandler *scaling.ScaleHandler, address, certDir string, certsReady chan struct{}, elected <-chan struct{}) GrpcServer {
return GrpcServer{
address: address,
scalerHandler: scaleHandler,
certDir: certDir,
certsReady: certsReady,
elected: elected,
}
}

Expand All @@ -83,8 +88,8 @@ func (s *GrpcServer) startServer() error {
return nil
}

// Start starts a new gRPC Metrics Service, this implements Runnable interface
// of controller-runtime Manager, so we can use mgr.Add() to start this component.
// StartGrpcServer starts the grpc server in non-serving mode and when the controller is elected leader
// sets the status of the server to Serving.
func (s *GrpcServer) Start(ctx context.Context) error {
<-s.certsReady
if s.server == nil {
Expand All @@ -107,30 +112,43 @@ func (s *GrpcServer) Start(ctx context.Context) error {

s.server = grpc.NewServer(grpcServerOpts...)
api.RegisterMetricsServiceServer(s.server, s)

s.healthServer = health.NewServer()
s.healthServer.SetServingStatus(api.MetricsService_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
grpc_health_v1.RegisterHealthServer(s.server, s.healthServer)
}

errChan := make(chan error)

go func() {
log.Info("Starting Metrics Service gRPC Server", "address", s.address)
if err := s.startServer(); err != nil {
if err := s.startServer(); err != nil && err != grpc.ErrServerStopped {
err := fmt.Errorf("unable to start Metrics Service gRPC server on address %s, error: %w", s.address, err)
log.Error(err, "error starting Metrics Service gRPC server")
errChan <- err
}
}()

select {
case err := <-errChan:
return err
case <-ctx.Done():
return nil
for {
select {
case err := <-errChan:
return err
case <-ctx.Done():
log.Info("Shutting down gRPC server")
s.healthServer.SetServingStatus(api.MetricsService_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
s.server.GracefulStop()
return nil
case <-s.elected:
// clear the channel now that we are leader-elected
s.elected = nil
log.Info("Setting gRPC server status to Serving")
s.healthServer.SetServingStatus(api.MetricsService_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
}
}
}

// NeedLeaderElection is needed to implement LeaderElectionRunnable interface
// of controller-runtime. This assures that the component is started/stoped
// when this particular instance is selected/deselected as a leader.
// We don't want to wait until LeaderElection to start the GRPC server, but we want to switch to Serving state once we are elected.
// Hence, here, we say we don't need leader election here and above we listen to the Elected channel from the manager to set the server to Serving
func (s *GrpcServer) NeedLeaderElection() bool {
return true
return false
}
7 changes: 7 additions & 0 deletions pkg/scalers/newrelic_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ func (s *newrelicScaler) executeNewRelicQuery(ctx context.Context) (float64, err
if err != nil {
return 0, fmt.Errorf("error running NRQL %s (%s)", s.metadata.nrql, err.Error())
}
// Check for empty results set, as New Relic lib does not report these as errors
if len(resp.Results) == 0 {
if s.metadata.noDataError {
return 0, fmt.Errorf("query return no results %s", s.metadata.nrql)
}
return 0, nil
}
// Only use the first result from the query, as the query should not be multi row
for _, v := range resp.Results[0] {
val, ok := v.(float64)
Expand Down
117 changes: 117 additions & 0 deletions vendor/google.golang.org/grpc/health/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions vendor/google.golang.org/grpc/health/logging.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8e7ae09

Please sign in to comment.