Skip to content

Commit

Permalink
Add GRPC Healthchecks (#5581)
Browse files Browse the repository at this point in the history
* Add healthchecks

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* add new vendor files

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* fix server startup

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* add logging

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* add graceful shutdown and client side healthchecking

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* remove context to fix static checks

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* add changelog

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* add round_robin policy

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* add round_robin policy

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* remove extra comma

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* change how grpc server starts relative to leader election

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

* update comment

Signed-off-by: Ali Aqel <aliaqel@stripe.com>

---------

Signed-off-by: Ali Aqel <aliaqel@stripe.com>
  • Loading branch information
aliaqel-stripe authored Mar 26, 2024
1 parent 6bc6139 commit 3bf5151
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features:
### Improvements

- **General**: Add command-line flag in Adapter to allow override of gRPC Authority Header ([#5449](https://github.com/kedacore/keda/issues/5449))
- **General**: Add GRPC Healthchecks ([#5590](https://github.com/kedacore/keda/issues/5590))
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
Expand Down
2 changes: 1 addition & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func main() {
close(certReady)
}

grpcServer := metricsservice.NewGrpcServer(&scaledHandler, metricsServiceAddr, certDir, certReady)
grpcServer := metricsservice.NewGrpcServer(&scaledHandler, metricsServiceAddr, certDir, certReady, mgr.Elected())
if err := mgr.Add(&grpcServer); err != nil {
setupLog.Error(err, "unable to set up Metrics Service gRPC server")
os.Exit(1)
Expand Down
28 changes: 17 additions & 11 deletions pkg/metricsservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,23 @@ type GrpcClient struct {
}

func NewGrpcClient(url, certDir, authority string) (*GrpcClient, error) {
defaultConfig := `{
"methodConfig": [{
"timeout": "3s",
"waitForReady": true,
"retryPolicy": {
"InitialBackoff": ".25s",
"MaxBackoff": "2.0s",
"BackoffMultiplier": 2,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`
defaultConfig := fmt.Sprintf(`{
"methodConfig": [{
"timeout": "3s",
"waitForReady": true,
"retryPolicy": {
"InitialBackoff": ".25s",
"MaxBackoff": "2.0s",
"BackoffMultiplier": 2,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}],
"loadBalancingPolicy": "round_robin",
"healthCheckConfig": {
"serviceName": "%s"
}
}`,
api.MetricsService_ServiceDesc.ServiceName)

creds, err := utils.LoadGrpcTLSCredentials(certDir, false)
if err != nil {
Expand Down
44 changes: 31 additions & 13 deletions pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -34,9 +36,11 @@ var log = logf.Log.WithName("grpc_server")

type GrpcServer struct {
server *grpc.Server
healthServer *health.Server
address string
certDir string
certsReady chan struct{}
elected <-chan struct{}
scalerHandler *scaling.ScaleHandler
api.UnimplementedMetricsServiceServer
}
Expand All @@ -60,12 +64,13 @@ func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*
}

// NewGrpcServer creates a new instance of GrpcServer
func NewGrpcServer(scaleHandler *scaling.ScaleHandler, address, certDir string, certsReady chan struct{}) GrpcServer {
func NewGrpcServer(scaleHandler *scaling.ScaleHandler, address, certDir string, certsReady chan struct{}, elected <-chan struct{}) GrpcServer {
return GrpcServer{
address: address,
scalerHandler: scaleHandler,
certDir: certDir,
certsReady: certsReady,
elected: elected,
}
}

Expand All @@ -82,8 +87,8 @@ func (s *GrpcServer) startServer() error {
return nil
}

// Start starts a new gRPC Metrics Service, this implements Runnable interface
// of controller-runtime Manager, so we can use mgr.Add() to start this component.
// StartGrpcServer starts the grpc server in non-serving mode and when the controller is elected leader
// sets the status of the server to Serving.
func (s *GrpcServer) Start(ctx context.Context) error {
<-s.certsReady
if s.server == nil {
Expand All @@ -93,30 +98,43 @@ func (s *GrpcServer) Start(ctx context.Context) error {
}
s.server = grpc.NewServer(grpc.Creds(creds))
api.RegisterMetricsServiceServer(s.server, s)

s.healthServer = health.NewServer()
s.healthServer.SetServingStatus(api.MetricsService_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
grpc_health_v1.RegisterHealthServer(s.server, s.healthServer)
}

errChan := make(chan error)

go func() {
log.Info("Starting Metrics Service gRPC Server", "address", s.address)
if err := s.startServer(); err != nil {
if err := s.startServer(); err != nil && err != grpc.ErrServerStopped {
err := fmt.Errorf("unable to start Metrics Service gRPC server on address %s, error: %w", s.address, err)
log.Error(err, "error starting Metrics Service gRPC server")
errChan <- err
}
}()

select {
case err := <-errChan:
return err
case <-ctx.Done():
return nil
for {
select {
case err := <-errChan:
return err
case <-ctx.Done():
log.Info("Shutting down gRPC server")
s.healthServer.SetServingStatus(api.MetricsService_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_NOT_SERVING)
s.server.GracefulStop()
return nil
case <-s.elected:
// clear the channel now that we are leader-elected
s.elected = nil
log.Info("Setting gRPC server status to Serving")
s.healthServer.SetServingStatus(api.MetricsService_ServiceDesc.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
}
}
}

// NeedLeaderElection is needed to implement LeaderElectionRunnable interface
// of controller-runtime. This assures that the component is started/stoped
// when this particular instance is selected/deselected as a leader.
// We don't want to wait until LeaderElection to start the GRPC server, but we want to switch to Serving state once we are elected.
// Hence, here, we say we don't need leader election here and above we listen to the Elected channel from the manager to set the server to Serving
func (s *GrpcServer) NeedLeaderElection() bool {
return true
return false
}
117 changes: 117 additions & 0 deletions vendor/google.golang.org/grpc/health/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions vendor/google.golang.org/grpc/health/logging.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3bf5151

Please sign in to comment.