Skip to content

Commit

Permalink
fix: Use mutex in the scaler cache to prevent concurrent refreshings (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JorTurFer authored Nov 4, 2024
1 parent 54c315d commit 5ea1eac
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Here is an overview of all new **experimental** features:

### Fixes

- **General**: Scalers cache uses a mutex to prevent concurrent actions ([#6273](https://github.com/kedacore/keda/issues/6273))
- **AWS Secret Manager**: Pod identity overrides are honored ([#6195](https://github.com/kedacore/keda/issues/6195))
- **AWS SQS Scaler**: Improve error handling for SQS queue metrics ([#6178](https://github.com/kedacore/keda/issues/6178))
- **Azure Event Hub Scaler**: Checkpointer errors are correctly handled ([#6084](https://github.com/kedacore/keda/issues/6084))
Expand Down
64 changes: 45 additions & 19 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache
import (
"context"
"fmt"
"sync"
"time"

"github.com/expr-lang/expr/vm"
Expand All @@ -40,6 +41,7 @@ type ScalersCache struct {
ScalableObjectGeneration int64
Recorder record.EventRecorder
CompiledFormula *vm.Program
mutex sync.RWMutex
}

type ScalerBuilder struct {
Expand All @@ -50,6 +52,9 @@ type ScalerBuilder struct {

// GetScalers returns array of scalers and scaler config stored in the cache
func (c *ScalersCache) GetScalers() ([]scalers.Scaler, []scalersconfig.ScalerConfig) {
c.mutex.RLock()
defer c.mutex.RUnlock()

scalersList := make([]scalers.Scaler, 0, len(c.Scalers))
configsList := make([]scalersconfig.ScalerConfig, 0, len(c.Scalers))
for _, s := range c.Scalers {
Expand All @@ -60,6 +65,17 @@ func (c *ScalersCache) GetScalers() ([]scalers.Scaler, []scalersconfig.ScalerCon
return scalersList, configsList
}

// getScalerBuilder returns a ScalerBuilder stored in the cache
func (c *ScalersCache) getScalerBuilder(index int) (ScalerBuilder, error) {
if index < 0 || index >= len(c.Scalers) {
return ScalerBuilder{}, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}

c.mutex.RLock()
defer c.mutex.RUnlock()
return c.Scalers[index], nil
}

// GetPushScalers returns array of push scalers stored in the cache
func (c *ScalersCache) GetPushScalers() []scalers.PushScaler {
var result []scalers.PushScaler
Expand All @@ -73,8 +89,10 @@ func (c *ScalersCache) GetPushScalers() []scalers.PushScaler {

// Close closes all scalers in the cache
func (c *ScalersCache) Close(ctx context.Context) {
c.mutex.Lock()
scalers := c.Scalers
c.Scalers = nil
c.mutex.Unlock()
for _, s := range scalers {
err := s.Scaler.Close(ctx)
if err != nil {
Expand All @@ -85,6 +103,8 @@ func (c *ScalersCache) Close(ctx context.Context) {

// GetMetricSpecForScaling returns metrics specs for all scalers in the cache
func (c *ScalersCache) GetMetricSpecForScaling(ctx context.Context) []v2.MetricSpec {
c.mutex.RLock()
defer c.mutex.RUnlock()
var spec []v2.MetricSpec
for _, s := range c.Scalers {
spec = append(spec, s.Scaler.GetMetricSpecForScaling(ctx)...)
Expand All @@ -96,12 +116,12 @@ func (c *ScalersCache) GetMetricSpecForScaling(ctx context.Context) []v2.MetricS
func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, index int) ([]v2.MetricSpec, error) {
var err error

scalersList, _ := c.GetScalers()
if index < 0 || index >= len(scalersList) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
sb, err := c.getScalerBuilder(index)
if err != nil {
return nil, err
}

metricSpecs := scalersList[index].GetMetricSpecForScaling(ctx)
metricSpecs := sb.Scaler.GetMetricSpecForScaling(ctx)

// no metric spec returned for a scaler -> this could signal error during connection to the scaler
// usually in case this is an external scaler
Expand All @@ -123,11 +143,12 @@ func (c *ScalersCache) GetMetricSpecForScalingForScaler(ctx context.Context, ind
// GetMetricsAndActivityForScaler returns metric value, activity and latency for a scaler identified by the metric name
// and by the input index (from the list of scalers in this ScaledObject)
func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, bool, time.Duration, error) {
if index < 0 || index >= len(c.Scalers) {
return nil, false, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
sb, err := c.getScalerBuilder(index)
if err != nil {
return nil, false, -1, err
}
startTime := time.Now()
metric, activity, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName)
metric, activity, err := sb.Scaler.GetMetricsAndActivity(ctx, metricName)
if err == nil {
return metric, activity, time.Since(startTime), nil
}
Expand All @@ -141,26 +162,31 @@ func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index
return metric, activity, time.Since(startTime), err
}

func (c *ScalersCache) refreshScaler(ctx context.Context, id int) (scalers.Scaler, error) {
if id < 0 || id >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers))
func (c *ScalersCache) refreshScaler(ctx context.Context, index int) (scalers.Scaler, error) {
oldSb, err := c.getScalerBuilder(index)
if err != nil {
return nil, err
}

sb := c.Scalers[id]
defer sb.Scaler.Close(ctx)
ns, sConfig, err := sb.Factory()
c.mutex.Lock()
defer c.mutex.Unlock()

newScaler, sConfig, err := oldSb.Factory()
if err != nil {
return nil, err
}

if id < 0 || id >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found, len = %d, cache has been probably already invalidated", id, len(c.Scalers))
if index < 0 || index >= len(c.Scalers) {
return nil, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}
c.Scalers[id] = ScalerBuilder{
Scaler: ns,

c.Scalers[index] = ScalerBuilder{
Scaler: newScaler,
ScalerConfig: *sConfig,
Factory: sb.Factory,
Factory: oldSb.Factory,
}

return ns, nil
oldSb.Scaler.Close(ctx)

return newScaler, nil
}

0 comments on commit 5ea1eac

Please sign in to comment.