Skip to content

Commit

Permalink
fix cache identifiers
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
zroubalik committed Nov 16, 2022
1 parent 576b28f commit 5c4ed0d
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 11 deletions.
4 changes: 2 additions & 2 deletions apis/keda/v1alpha1/indentifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"
)

// GenerateIdenitifier returns identifier for the object in form "kind.namespace.name"
func GenerateIdenitifier(kind, name, namespace string) string {
// GenerateIdentifier returns identifier for the object in form "kind.namespace.name" (lowercase)
func GenerateIdentifier(kind, namespace, name string) string {
return strings.ToLower(fmt.Sprintf("%s.%s.%s", kind, namespace, name))
}
6 changes: 3 additions & 3 deletions apis/keda/v1alpha1/withtriggers_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (t *WithTriggers) GetPollingInterval() time.Duration {
return time.Second * time.Duration(defaultPollingInterval)
}

// GenerateIdenitifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdenitifier() string {
return GenerateIdenitifier(t.Kind, t.Namespace, t.Name)
// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (t *WithTriggers) GenerateIdentifier() string {
return GenerateIdentifier(t.Kind, t.Namespace, t.Name)
}
4 changes: 4 additions & 0 deletions pkg/scaling/cache/scalers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (c *ScalersCache) IsScaledJobActive(ctx context.Context, scaledJob *kedav1a
return isActive, ceilToInt64(queueLength), ceilToInt64(maxValue)
}

// TODO this is probably not needed, revisit whole package
func (c *ScalersCache) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
var metrics []external_metrics.ExternalMetricValue
for i, s := range c.Scalers {
Expand Down Expand Up @@ -246,7 +247,9 @@ type scalerMetrics struct {
isActive bool
}

// TODO needs refactor
func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scalerMetrics {
// TODO this loop should be probably done similar way the ScaledObject loop is done
var scalersMetrics []scalerMetrics
for i, s := range c.Scalers {
var queueLength float64
Expand Down Expand Up @@ -282,6 +285,7 @@ func (c *ScalersCache) getScaledJobMetrics(ctx context.Context, scaledJob *kedav

targetAverageValue = getTargetAverageValue(metricSpecs)

// TODO this should probably be `cache.GetMetricsForScaler(ctx, scalerIndex, metricName)`
metrics, err := s.Scaler.GetMetrics(ctx, metricSpecs[0].External.Metric.Name)
if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics, but continue", "Error", err)
Expand Down
15 changes: 9 additions & 6 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *scaleHandler) HandleScalableObject(ctx context.Context, scalableObject
return err
}

key := withTriggers.GenerateIdenitifier()
key := withTriggers.GenerateIdentifier()
ctx, cancel := context.WithCancel(ctx)

// cancel the outdated ScaleLoop for the same ScaledObject (if exists)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (h *scaleHandler) DeleteScalableObject(ctx context.Context, scalableObject
return err
}

key := withTriggers.GenerateIdenitifier()
key := withTriggers.GenerateIdentifier()
result, ok := h.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
Expand Down Expand Up @@ -170,7 +170,7 @@ func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1a
}

func (h *scaleHandler) GetScalersCacheForScaledObject(ctx context.Context, scaledObjectName, scaledObjectNamespace string) (*cache.ScalersCache, error) {
key := kedav1alpha1.GenerateIdenitifier("ScaledObject", scaledObjectName, scaledObjectNamespace)
key := kedav1alpha1.GenerateIdentifier("ScaledObject", scaledObjectNamespace, scaledObjectName)

h.lock.RLock()
if cache, ok := h.scalerCaches[key]; ok {
Expand Down Expand Up @@ -225,7 +225,7 @@ func (h *scaleHandler) GetScalersCache(ctx context.Context, scalableObject inter
return nil, err
}

key := withTriggers.GenerateIdenitifier()
key := withTriggers.GenerateIdentifier()

h.lock.RLock()
if cache, ok := h.scalerCaches[key]; ok && cache.Generation == withTriggers.Generation {
Expand Down Expand Up @@ -275,12 +275,12 @@ func (h *scaleHandler) ClearScalersCache(ctx context.Context, scalableObject int
return err
}

key := withTriggers.GenerateIdenitifier()
key := withTriggers.GenerateIdentifier()

h.lock.Lock()
defer h.lock.Unlock()

if cache, ok := h.scalerCaches[key]; ok {
h.logger.V(1).WithValues("key", key).Info("Removing entry from ScalersCache")
cache.Close(ctx)
delete(h.scalerCaches, key)
}
Expand Down Expand Up @@ -357,6 +357,9 @@ func (h *scaleHandler) GetExternalMetricsValuesList(ctx context.Context, cache *
// let's check metrics for all scalers in a ScaledObject
scalerError := false
scalers, scalerConfigs := cache.GetScalers()

h.logger.V(1).WithValues("name", scaledObject.Name, "namespace", scaledObject.Namespace, "metricName", metricName, "scalers", scalers).Info("Getting metric value")

for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ {
metricSpecs := scalers[scalerIndex].GetMetricSpecForScaling(ctx)
scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1)
Expand Down

0 comments on commit 5c4ed0d

Please sign in to comment.