Skip to content

Commit

Permalink
Fix #1107: adding first support for Kamelets
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Jan 12, 2022
1 parent 2fbfef6 commit c0cc560
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 7 deletions.
192 changes: 188 additions & 4 deletions addons/keda/keda.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,42 @@ limitations under the License.
package keda

import (
"fmt"
"sort"
"strings"

kedav1alpha1 "github.com/apache/camel-k/addons/keda/duck/v1alpha1"
camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
camelv1alpha1 "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"github.com/apache/camel-k/pkg/kamelet/repository"
"github.com/apache/camel-k/pkg/metadata"
"github.com/apache/camel-k/pkg/platform"
"github.com/apache/camel-k/pkg/trait"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/kubernetes"
"github.com/apache/camel-k/pkg/util/property"
"github.com/apache/camel-k/pkg/util/source"
"github.com/apache/camel-k/pkg/util/uri"
"github.com/pkg/errors"
scase "github.com/stoewer/go-strcase"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
// kameletURNTypePrefix indicates the scaler type associated to a Kamelet
kameletURNTypePrefix = "urn:keda:type:"
// kameletURNMetadataPrefix allows binding Kamelet properties to Keda metadata
kameletURNMetadataPrefix = "urn:keda:metadata:"
// kameletURNRequiredTag is used to mark properties required by Keda
kameletURNRequiredTag = "urn:keda:required"

// kameletAnnotationType is an alternative to kameletURNTypePrefix.
// To be removed when the `spec -> definition -> x-descriptors` field becomes stable.
kameletAnnotationType = "camel.apache.org/keda.type"
)

// The Keda trait can be used for automatic integration with Keda autoscalers.
//
// The Keda trait is disabled by default.
Expand Down Expand Up @@ -79,7 +103,14 @@ func (t *kedaTrait) Configure(e *trait.Environment) (bool, error) {
return false, nil
}

return true, nil
if t.Auto == nil || *t.Auto {
if err := t.populateTriggersFromKamelets(e); err != nil {
// TODO: set condition
return false, err
}
}

return len(t.Triggers) > 0, nil
}

func (t *kedaTrait) Apply(e *trait.Environment) error {
Expand Down Expand Up @@ -142,7 +173,6 @@ func (t *kedaTrait) getScaledObject(e *trait.Environment) (*kedav1alpha1.ScaledO

func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
ctrlRef := t.getTopControllerReference(e)
applier := e.Client.ServerOrClientSideApplier()
if ctrlRef.Kind == camelv1alpha1.KameletBindingKind {
// Update the KameletBinding directly (do not add it to env resources, it's the integration parent)
key := client.ObjectKey{
Expand All @@ -156,15 +186,15 @@ func (t *kedaTrait) hackControllerReplicas(e *trait.Environment) error {
if klb.Spec.Replicas == nil {
one := int32(1)
klb.Spec.Replicas = &one
if err := applier.Apply(e.Ctx, &klb); err != nil {
if err := e.Client.Update(e.Ctx, &klb); err != nil {
return err
}
}
} else {
if e.Integration.Spec.Replicas == nil {
one := int32(1)
e.Integration.Spec.Replicas = &one
if err := applier.Apply(e.Ctx, e.Integration); err != nil {
if err := e.Client.Update(e.Ctx, e.Integration); err != nil {
return err
}
}
Expand All @@ -188,3 +218,157 @@ func (t *kedaTrait) getTopControllerReference(e *trait.Environment) *v1.ObjectRe
Name: e.Integration.Name,
}
}

func (t *kedaTrait) populateTriggersFromKamelets(e *trait.Environment) error {
sources, err := kubernetes.ResolveIntegrationSources(e.Ctx, e.Client, e.Integration, e.Resources)
if err != nil {
return err
}
kameletURIs := make(map[string][]string)
metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
for _, uri := range meta.FromURIs {
if kameletStr := source.ExtractKamelet(uri); kameletStr != "" && camelv1alpha1.ValidKameletName(kameletStr) {
kamelet := kameletStr
if strings.Contains(kamelet, "/") {
kamelet = kamelet[0:strings.Index(kamelet, "/")]
}
uriList := kameletURIs[kamelet]
util.StringSliceUniqueAdd(&uriList, uri)
sort.Strings(uriList)
kameletURIs[kamelet] = uriList
}
}
return true
})

if len(kameletURIs) == 0 {
return nil
}

repo, err := repository.NewForPlatform(e.Ctx, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
if err != nil {
return err
}

sortedKamelets := make([]string, 0, len(kameletURIs))
for kamelet, _ := range kameletURIs {
sortedKamelets = append(sortedKamelets, kamelet)
}
sort.Strings(sortedKamelets)
for _, kamelet := range sortedKamelets {
uris := kameletURIs[kamelet]
if err := t.populateTriggersFromKamelet(e, repo, kamelet, uris); err != nil {
return err
}
}

return nil
}

func (t *kedaTrait) populateTriggersFromKamelet(e *trait.Environment, repo repository.KameletRepository, kameletName string, uris []string) error {
kamelet, err := repo.Get(e.Ctx, kameletName)
if err != nil {
return err
} else if kamelet == nil {
return fmt.Errorf("kamelet %q not found", kameletName)
}
if kamelet.Spec.Definition == nil {
return nil
}
triggerType := t.getKedaType(kamelet)
if triggerType == "" {
return nil
}

metadataToProperty := make(map[string]string)
requiredMetadata := make(map[string]bool)
for k, def := range kamelet.Spec.Definition.Properties {
if metadataName := t.getXDescriptorValue(def.XDescriptors, kameletURNMetadataPrefix); metadataName != "" {
metadataToProperty[metadataName] = k
if req := t.isXDescriptorPresent(def.XDescriptors, kameletURNRequiredTag); req {
requiredMetadata[metadataName] = true
}
}
}
for _, uri := range uris {
if err := t.populateTriggersFromKameletURI(e, kameletName, triggerType, metadataToProperty, requiredMetadata, uri); err != nil {
return err
}
}
return nil
}

func (t *kedaTrait) populateTriggersFromKameletURI(e *trait.Environment, kameletName string, triggerType string, metadataToProperty map[string]string, requiredMetadata map[string]bool, kameletURI string) error {
metaValues := make(map[string]string, len(metadataToProperty))
for metaParam, prop := range metadataToProperty {
// From lowest priority to top
if v := e.ApplicationProperties[fmt.Sprintf("camel.kamelet.%s.%s", kameletName, prop)]; v != "" {
metaValues[metaParam] = v
}
if kameletID := uri.GetPathSegment(kameletURI, 0); kameletID != "" {
kameletSpecificKey := fmt.Sprintf("camel.kamelet.%s.%s.%s", kameletName, kameletID, prop)
if v := e.ApplicationProperties[kameletSpecificKey]; v != "" {
metaValues[metaParam] = v
}
for _, c := range e.Integration.Spec.Configuration {
if c.Type == "property" && strings.HasPrefix(c.Value, kameletSpecificKey) {
v, err := property.DecodePropertyFileValue(c.Value, kameletSpecificKey)
if err != nil {
return errors.Wrapf(err, "could not decode property %q", kameletSpecificKey)
}
metaValues[metaParam] = v
}
}
}
if v := uri.GetQueryParameter(kameletURI, prop); v != "" {
metaValues[metaParam] = v
}
}

for req := range requiredMetadata {
if _, ok := metaValues[req]; !ok {
return fmt.Errorf("metadata parameter %q is missing in configuration: it is required by Keda", req)
}
}

kebabMetaValues := make(map[string]string, len(metaValues))
for k, v := range metaValues {
kebabMetaValues[scase.KebabCase(k)] = v
}

// Add the trigger in config
trigger := kedaTrigger{
Type: triggerType,
Metadata: kebabMetaValues,
}
t.Triggers = append(t.Triggers, trigger)
return nil
}

func (t *kedaTrait) getKedaType(kamelet *camelv1alpha1.Kamelet) string {
if kamelet.Spec.Definition != nil {
triggerType := t.getXDescriptorValue(kamelet.Spec.Definition.XDescriptors, kameletURNTypePrefix)
if triggerType != "" {
return triggerType
}
}
return kamelet.Annotations[kameletAnnotationType]
}

func (t *kedaTrait) getXDescriptorValue(descriptors []string, prefix string) string {
for _, d := range descriptors {
if strings.HasPrefix(d, prefix) {
return d[len(prefix):]
}
}
return ""
}

func (t *kedaTrait) isXDescriptorPresent(descriptors []string, desc string) bool {
for _, d := range descriptors {
if d == desc {
return true
}
}
return false
}
4 changes: 3 additions & 1 deletion pkg/apis/camel/v1alpha1/jsonschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type JSONSchemaProp struct {
Enum []JSON `json:"enum,omitempty"`
Example *JSON `json:"example,omitempty"`
Nullable bool `json:"nullable,omitempty"`
// The list of descriptors that determine which UI components to use on different views
// XDescriptors is a list of extended properties that trigger a custom behavior in external systems
XDescriptors []string `json:"x-descriptors,omitempty"`
}

Expand All @@ -89,6 +89,8 @@ type JSONSchemaProps struct {
ExternalDocs *ExternalDocumentation `json:"externalDocs,omitempty"`
Schema JSONSchemaURL `json:"$schema,omitempty"`
Type string `json:"type,omitempty"`
// XDescriptors is a list of extended properties that trigger a custom behavior in external systems
XDescriptors []string `json:"x-descriptors,omitempty"`
}

// RawMessage is a raw encoded JSON value.
Expand Down
6 changes: 5 additions & 1 deletion pkg/client/serverside.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (c *defaultClient) ServerOrClientSideApplier() ServerOrClientSideApplier {
func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object ctrl.Object) error {
once := false
var err error
needsRetry := false
a.tryServerSideApply.Do(func() {
once = true
if err = a.serverSideApply(ctx, object); err != nil {
Expand All @@ -57,12 +58,15 @@ func (a *ServerOrClientSideApplier) Apply(ctx context.Context, object ctrl.Objec
a.hasServerSideApply.Store(false)
err = nil
} else {
a.tryServerSideApply = sync.Once{}
needsRetry = true
}
} else {
a.hasServerSideApply.Store(true)
}
})
if needsRetry {
a.tryServerSideApply = sync.Once{}
}
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/property/property.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,14 @@ func SplitPropertyFileEntry(entry string) (string, string) {
}
return k, v
}

// DecodePropertyFileEntry returns the decoded value corresponding to the given key in the entry.
func DecodePropertyFileValue(entry, key string) (string, error) {
p := properties.NewProperties()
p.DisableExpansion = true
if err := p.Load([]byte(entry), properties.UTF8); err != nil {
return "", err
}
val, _ := p.Get(key)
return val, nil
}
15 changes: 14 additions & 1 deletion pkg/util/uri/uri.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

var uriRegexp = regexp.MustCompile(`^[a-z0-9+][a-zA-Z0-9-+]*:.*$`)

var pathExtractorRegexp = regexp.MustCompile(`^[a-z0-9+][a-zA-Z0-9-+]*:(?://){0,1}[^/?]+/([^?]+)(?:[?].*){0,1}$`)
var queryExtractorRegexp = `^[^?]+\?(?:|.*[&])%s=([^&]+)(?:[&].*|$)`

// HasCamelURIFormat tells if a given string may belong to a Camel URI, without checking any catalog.
Expand Down Expand Up @@ -57,6 +57,19 @@ func GetQueryParameter(uri string, param string) string {
return res
}

// GetPathSegment returns the path segment of the URI corresponding to the given position (0 based), if present
func GetPathSegment(uri string, pos int) string {
match := pathExtractorRegexp.FindStringSubmatch(uri)
if len(match) > 1 {
fullPath := match[1]
parts := strings.Split(fullPath, "/")
if pos >= 0 && pos < len(parts) {
return parts[pos]
}
}
return ""
}

func matchOrEmpty(reg *regexp.Regexp, str string) string {
match := reg.FindStringSubmatch(str)
if len(match) > 1 {
Expand Down
Loading

0 comments on commit c0cc560

Please sign in to comment.