Skip to content

Commit

Permalink
Metrics API Scaler support different formats (#5347)
Browse files Browse the repository at this point in the history
* Metrics API Scaler support different format

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

* fix golangci-lint issues

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

* trigger ci and go mod tidy

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

* gci

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

* fix tests for metrics_api_scaler; add new tests for value_by_path; fix []interface edge case for value_by_path

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

* fix: add format field to e2e tests; edit comment about apiformat

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

* update changelog

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

* Apply suggestions from code review

Co-authored-by: Jan Wozniak <wozniak.jan@gmail.com>
Signed-off-by: Murr Kyuri <39532283+Friedrich42@users.noreply.github.com>

* decrease duplication by defining a variable for inputs, revert test format field to ensure backwards compatibility

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

* add docstring with examples

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>

---------

Signed-off-by: Friedrich Albert Kyuri <friedrichak42@gmail.com>
Signed-off-by: Murr Kyuri <39532283+Friedrich42@users.noreply.github.com>
Co-authored-by: Jan Wozniak <wozniak.jan@gmail.com>
  • Loading branch information
fira42073 and wozniakjan authored Mar 24, 2024
1 parent 55075a7 commit 76c6ac0
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add command-line flag in Adapter to allow override of gRPC Authority Header ([#5449](https://github.com/kedacore/keda/issues/5449))
- **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375))
- **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441))
- **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633))
- **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544))

### Fixes
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gopkg.in/yaml.v3 v3.0.1
k8s.io/apiextensions-apiserver v0.29.0 // indirect
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect
k8s.io/kms v0.29.0 // indirect
Expand Down
152 changes: 145 additions & 7 deletions pkg/scalers/metrics_api_scaler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package scalers

import (
"bufio"
"bytes"
"context"
"encoding/xml"
"errors"
"fmt"
"io"
Expand All @@ -12,6 +15,7 @@ import (

"github.com/go-logr/logr"
"github.com/tidwall/gjson"
"gopkg.in/yaml.v3"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand All @@ -32,6 +36,7 @@ type metricsAPIScalerMetadata struct {
targetValue float64
activationTargetValue float64
url string
format APIFormat
valueLocation string
unsafeSsl bool

Expand Down Expand Up @@ -62,7 +67,27 @@ type metricsAPIScalerMetadata struct {
}

const (
methodValueQuery = "query"
methodValueQuery = "query"
valueLocationWrongErrorMsg = "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
)

type APIFormat string

// Options for APIFormat:
const (
PrometheusFormat APIFormat = "prometheus"
JSONFormat APIFormat = "json"
XMLFormat APIFormat = "xml"
YAMLFormat APIFormat = "yaml"
)

var (
supportedFormats = []APIFormat{
PrometheusFormat,
JSONFormat,
XMLFormat,
YAMLFormat,
}
)

// NewMetricsAPIScaler creates a new HTTP scaler
Expand Down Expand Up @@ -137,6 +162,16 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca
return nil, fmt.Errorf("no url given in metadata")
}

if val, ok := config.TriggerMetadata["format"]; ok {
meta.format = APIFormat(strings.TrimSpace(val))
if !kedautil.Contains(supportedFormats, meta.format) {
return nil, fmt.Errorf("format %s not supported", meta.format)
}
} else {
// default format is JSON for backward compatibility
meta.format = JSONFormat
}

if val, ok := config.TriggerMetadata["valueLocation"]; ok {
meta.valueLocation = val
} else {
Expand Down Expand Up @@ -211,23 +246,126 @@ func parseMetricsAPIMetadata(config *scalersconfig.ScalerConfig) (*metricsAPISca
return &meta, nil
}

// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body
func GetValueFromResponse(body []byte, valueLocation string) (float64, error) {
// GetValueFromResponse uses provided valueLocation to access the numeric value in provided body using the format specified.
func GetValueFromResponse(body []byte, valueLocation string, format APIFormat) (float64, error) {
switch format {
case PrometheusFormat:
return getValueFromPrometheusResponse(body, valueLocation)
case JSONFormat:
return getValueFromJSONResponse(body, valueLocation)
case XMLFormat:
return getValueFromXMLResponse(body, valueLocation)
case YAMLFormat:
return getValueFromYAMLResponse(body, valueLocation)
}

return 0, fmt.Errorf("format %s not supported", format)
}

// getValueFromPrometheusResponse uses provided valueLocation to access the numeric value in provided body
func getValueFromPrometheusResponse(body []byte, valueLocation string) (float64, error) {
scanner := bufio.NewScanner(bytes.NewReader(body))
for scanner.Scan() {
line := scanner.Text()
fields := strings.Fields(line)
if len(fields) == 0 || strings.HasPrefix(fields[0], "#") {
continue
}
if len(fields) == 2 && strings.HasPrefix(fields[0], valueLocation) {
value, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return 0, err
}
return value, nil
}
}

if err := scanner.Err(); err != nil {
return 0, err
}

return 0, fmt.Errorf("value %s not found", valueLocation)
}

// getValueFromJSONResponse uses provided valueLocation to access the numeric value in provided body using GJSON
func getValueFromJSONResponse(body []byte, valueLocation string) (float64, error) {
r := gjson.GetBytes(body, valueLocation)
errorMsg := "valueLocation must point to value of type number or a string representing a Quantity got: '%s'"
if r.Type == gjson.String {
v, err := resource.ParseQuantity(r.String())
if err != nil {
return 0, fmt.Errorf(errorMsg, r.String())
return 0, fmt.Errorf(valueLocationWrongErrorMsg, r.String())
}
return v.AsApproximateFloat64(), nil
}
if r.Type != gjson.Number {
return 0, fmt.Errorf(errorMsg, r.Type.String())
return 0, fmt.Errorf(valueLocationWrongErrorMsg, r.Type.String())
}
return r.Num, nil
}

// getValueFromXMLResponse uses provided valueLocation to access the numeric value in provided body
func getValueFromXMLResponse(body []byte, valueLocation string) (float64, error) {
var xmlMap map[string]interface{}
err := xml.Unmarshal(body, &xmlMap)
if err != nil {
return 0, err
}

path, err := kedautil.GetValueByPath(xmlMap, valueLocation)
if err != nil {
return 0, err
}

switch v := path.(type) {
case int:
return float64(v), nil
case int64:
return float64(v), nil
case float64:
return v, nil
case string:
r, err := resource.ParseQuantity(v)
if err != nil {
return 0, fmt.Errorf(valueLocationWrongErrorMsg, v)
}
return r.AsApproximateFloat64(), nil
default:
return 0, fmt.Errorf(valueLocationWrongErrorMsg, v)
}
}

// getValueFromYAMLResponse uses provided valueLocation to access the numeric value in provided body
// using generic ketautil.GetValueByPath
func getValueFromYAMLResponse(body []byte, valueLocation string) (float64, error) {
var yamlMap map[string]interface{}
err := yaml.Unmarshal(body, &yamlMap)
if err != nil {
return 0, err
}

path, err := kedautil.GetValueByPath(yamlMap, valueLocation)
if err != nil {
return 0, err
}

switch v := path.(type) {
case int:
return float64(v), nil
case int64:
return float64(v), nil
case float64:
return v, nil
case string:
r, err := resource.ParseQuantity(v)
if err != nil {
return 0, fmt.Errorf(valueLocationWrongErrorMsg, v)
}
return r.AsApproximateFloat64(), nil
default:
return 0, fmt.Errorf(valueLocationWrongErrorMsg, v)
}
}

func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error) {
request, err := getMetricAPIServerRequest(ctx, s.metadata)
if err != nil {
Expand All @@ -249,7 +387,7 @@ func (s *metricsAPIScaler) getMetricValue(ctx context.Context) (float64, error)
if err != nil {
return 0, err
}
v, err := GetValueFromResponse(b, s.metadata.valueLocation)
v, err := GetValueFromResponse(b, s.metadata.valueLocation, s.metadata.format)
if err != nil {
return 0, err
}
Expand Down
66 changes: 31 additions & 35 deletions pkg/scalers/metrics_api_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,42 +123,38 @@ func TestMetricsAPIGetMetricSpecForScaling(t *testing.T) {
}

func TestGetValueFromResponse(t *testing.T) {
d := []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`)
v, err := GetValueFromResponse(d, "components.0.tasks")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 32 {
t.Errorf("Expected %d got %f", 32, v)
}

v, err = GetValueFromResponse(d, "count")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 2.43 {
t.Errorf("Expected %d got %f", 2, v)
}

v, err = GetValueFromResponse(d, "components.0.str")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 64 {
t.Errorf("Expected %d got %f", 64, v)
}

v, err = GetValueFromResponse(d, "components.0.k")
if err != nil {
t.Error("Expected success but got error", err)
}
if v != 1000 {
t.Errorf("Expected %d got %f", 1000, v)
}
inputJSON := []byte(`{"components":[{"id": "82328e93e", "tasks": 32, "str": "64", "k":"1k","wrong":"NaN"}],"count":2.43}`)
inputYAML := []byte(`{components: [{id: 82328e93e, tasks: 32, str: '64', k: 1k, wrong: NaN}], count: 2.43}`)

testCases := []struct {
name string
input []byte
key string
format APIFormat
expectVal float64
expectErr bool
}{
{name: "integer", input: inputJSON, key: "count", format: JSONFormat, expectVal: 2.43},
{name: "string", input: inputJSON, key: "components.0.str", format: JSONFormat, expectVal: 64},
{name: "{}.[].{}", input: inputJSON, key: "components.0.tasks", format: JSONFormat, expectVal: 32},
{name: "invalid data", input: inputJSON, key: "components.0.wrong", format: JSONFormat, expectErr: true},

{name: "integer", input: inputYAML, key: "count", format: YAMLFormat, expectVal: 2.43},
{name: "string", input: inputYAML, key: "components.0.str", format: YAMLFormat, expectVal: 64},
{name: "{}.[].{}", input: inputYAML, key: "components.0.tasks", format: YAMLFormat, expectVal: 32},
{name: "invalid data", input: inputYAML, key: "components.0.wrong", format: YAMLFormat, expectErr: true},
}

for _, tc := range testCases {
t.Run(string(tc.format)+": "+tc.name, func(t *testing.T) {
v, err := GetValueFromResponse(tc.input, tc.key, tc.format)

if tc.expectErr {
assert.Error(t, err)
}

_, err = GetValueFromResponse(d, "components.0.wrong")
if err == nil {
t.Error("Expected error but got success", err)
assert.EqualValues(t, tc.expectVal, v)
})
}
}

Expand Down
59 changes: 59 additions & 0 deletions pkg/util/value_by_path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package util

import (
"fmt"
"strings"
)

// GetValueByPath retrieves a value from a nested map using a dot-separated path
// It also supports .number syntax to access array elements.
//
// This is a helper function for niche use cases.
// Consider using https://pkg.go.dev/k8s.io/apimachinery@v0.29.3/pkg/apis/meta/v1/unstructured#NestedFieldNoCopy instead
//
// Examples:
//
// data := map[string]interface{}{
// "a": map[string]interface{}{
// "b": []interface{}{
// map[string]interface{}{"c": 1},
// map[string]interface{}{"c": 2},
// },
// },
// }
//
// GetValueByPath(data, "a.b.0.c") // 1
// GetValueByPath(data, "not.found") // error
func GetValueByPath(data map[string]interface{}, path string) (interface{}, error) {
keys := strings.Split(path, ".")
current := data

for _, key := range keys {
val, ok := current[key]
if !ok {
return nil, fmt.Errorf("key '%s' not found in path '%s'", key, path)
}

switch typedValue := val.(type) {
case map[interface{}]interface{}:
// Convert map[interface{}]interface{} to map[string]interface{}
current = make(map[string]interface{})
for k, v := range typedValue {
current[fmt.Sprintf("%v", k)] = v
}
case []interface{}:
// Convert map[interface{}]interface{} to map[string]interface{}
current = make(map[string]interface{})
for k, v := range typedValue {
current[fmt.Sprintf("%v", k)] = v
}
case map[string]interface{}:
current = typedValue
default:
// Reached the final value
return val, nil
}
}

return nil, fmt.Errorf("path '%s' does not lead to a value", path)
}
Loading

0 comments on commit 76c6ac0

Please sign in to comment.