From 656ddb301cd241c13a7f43f24debbebc594db1f4 Mon Sep 17 00:00:00 2001 From: Christoph Deppisch Date: Thu, 6 Jun 2024 10:26:04 +0200 Subject: [PATCH] fix: Improve Knative trigger filter - Fixes #5537: Support filter attributes other than event type (e.g. source, subject, extensions) - Fixes #5529: Allows empty filter to consume the full event stream - Fixes #5446: Knative Trigger creation is only based on event type attribute - Fixes #5577: Consistently support "cloudEventsType" property in Pipes source/sink - Update documentation and improve Knative Kamelet/Pipe user guide --- .github/actions/e2e-knative-yaks/action.yml | 2 +- .github/actions/kamel-cleanup/cleanup.sh | 2 +- .github/actions/kamel-install-yaks/action.yml | 2 +- .github/workflows/knative.yml | 2 +- .../ROOT/pages/kamelets/kamelets-user.adoc | 117 ++- .../ROOT/pages/running/camel-runtimes.adoc | 4 +- .../ROOT/partials/apis/camel-k-crds.adoc | 18 + docs/modules/traits/pages/knative.adoc | 12 + e2e/knative/kamelet_test.go | 6 +- e2e/support/util/dump.go | 38 + .../knative-broker/event-source-pipe.yaml | 45 ++ .../knative-broker/knative-pipe.feature | 35 + .../common/knative-broker/log-sink-pipe.yaml | 39 + .../common/knative-broker/no-filter-pipe.yaml | 37 + .../knative-broker/source-filter-pipe.yaml | 39 + .../common/knative-broker/yaks-config.yaml | 6 + .../crds/crd-integration-platform.yaml | 34 + .../camel-k/crds/crd-integration-profile.yaml | 34 + helm/camel-k/crds/crd-integration.yaml | 17 + helm/camel-k/crds/crd-kamelet-binding.yaml | 17 + helm/camel-k/crds/crd-pipe.yaml | 17 + pkg/apis/camel/v1/knative/types.go | 2 + pkg/apis/camel/v1/trait/knative.go | 8 + .../camel/v1/trait/zz_generated.deepcopy.go | 10 + ...camel.apache.org_integrationplatforms.yaml | 34 + .../camel.apache.org_integrationprofiles.yaml | 34 + .../bases/camel.apache.org_integrations.yaml | 17 + .../camel.apache.org_kameletbindings.yaml | 17 + .../crd/bases/camel.apache.org_pipes.yaml | 17 + pkg/trait/jvm_test.go | 2 +- pkg/trait/knative.go | 63 +- pkg/trait/knative_test.go | 714 +++++++++++++++++- pkg/util/bindings/bindings_test.go | 6 + pkg/util/bindings/knative_ref.go | 75 +- pkg/util/bindings/knative_ref_test.go | 182 ++++- pkg/util/knative/knative.go | 46 +- pkg/util/knative/uri.go | 2 +- script/Makefile | 2 +- 38 files changed, 1643 insertions(+), 111 deletions(-) create mode 100644 e2e/yaks/common/knative-broker/event-source-pipe.yaml create mode 100644 e2e/yaks/common/knative-broker/knative-pipe.feature create mode 100644 e2e/yaks/common/knative-broker/log-sink-pipe.yaml create mode 100644 e2e/yaks/common/knative-broker/no-filter-pipe.yaml create mode 100644 e2e/yaks/common/knative-broker/source-filter-pipe.yaml diff --git a/.github/actions/e2e-knative-yaks/action.yml b/.github/actions/e2e-knative-yaks/action.yml index 83becbcad5..50281fe48c 100644 --- a/.github/actions/e2e-knative-yaks/action.yml +++ b/.github/actions/e2e-knative-yaks/action.yml @@ -106,7 +106,7 @@ runs: uses: ./.github/actions/kamel-install-yaks with: image-name: "docker.io/citrusframework/yaks" - version: 0.14.3 + version: 0.19.1 - id: report-problematic name: List Tests Marked As Problematic diff --git a/.github/actions/kamel-cleanup/cleanup.sh b/.github/actions/kamel-cleanup/cleanup.sh index f2a9c62e5b..d384794234 100755 --- a/.github/actions/kamel-cleanup/cleanup.sh +++ b/.github/actions/kamel-cleanup/cleanup.sh @@ -103,6 +103,6 @@ fi kubectl get crds | grep camel | awk '{print $1}' | xargs kubectl delete crd &> /dev/null # -# Remove KNative resources +# Remove Knative resources # ./.github/actions/kamel-cleanup/cleanup-knative.sh diff --git a/.github/actions/kamel-install-yaks/action.yml b/.github/actions/kamel-install-yaks/action.yml index 6414115116..a6babcea32 100644 --- a/.github/actions/kamel-install-yaks/action.yml +++ b/.github/actions/kamel-install-yaks/action.yml @@ -21,7 +21,7 @@ description: 'Install YAKS artifacts' inputs: version: description: "The YAKS version" - default: 0.14.3 + default: 0.19.1 required: false image-name: description: "The YAKS operator image name" diff --git a/.github/workflows/knative.yml b/.github/workflows/knative.yml index 5900fbff70..f70f203e68 100644 --- a/.github/workflows/knative.yml +++ b/.github/workflows/knative.yml @@ -88,7 +88,7 @@ jobs: -q "${{ github.event.inputs.log-level }}" \ -t "${{ github.event.inputs.test-filters }}" - - name: KNative Tests + - name: Knative Tests uses: ./.github/actions/e2e-knative with: cluster-config-data: ${{ secrets.E2E_CLUSTER_CONFIG }} diff --git a/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc b/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc index 6011b4877d..4f8f6b4050 100644 --- a/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc +++ b/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc @@ -227,19 +227,19 @@ doing additional processing (such as tranformations or other enterprise integrat A common use case is that of **Knative Sources**, for which the Apache Camel developers maintain the https://knative.dev/docs/eventing/samples/apache-camel-source/[Knative CamelSources]. Kamelets represent an **evolution** of the model proposed in CamelSources, but they allow using the same declarative style of binding, via a resource named **Pipe**. -=== Binding to a Knative Destination +=== Binding to Knative -A Pipe allows to declaratively move data from a system described by a Kamelet towards a Knative destination (or other kind of destinations, in the future), or from -a Knative channel/broker to another external system described by a Kamelet. +A Pipe allows to move data from a system described by a Kamelet towards a Knative destination, or from a Knative channel/broker to another external system described by a Kamelet. +This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way. -For example, here's an example of binding: +For example, here is a pipe that connects a Kamelet Telegram source to the Knative broker: [source,yaml] ---- apiVersion: camel.apache.org/v1 kind: Pipe metadata: - name: telegram-text-source-to-channel + name: telegram-to-knative spec: source: # <1> ref: @@ -250,23 +250,118 @@ spec: botToken: the-token-here sink: # <2> ref: - kind: InMemoryChannel - apiVersion: messaging.knative.dev/v1 - name: messages + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default ---- <1> Reference to the source that provides data <2> Reference to the sink where data should be sent to This binding takes the `telegram-text-source` Kamelet, configures it using specific properties ("botToken") and -makes sure that messages produced by the Kamelet are forwarded to the Knative **InMemoryChannel** named "messages". +makes sure that messages produced by the Kamelet are forwarded to the Knative **Broker** named "default". + +Note that source and sink are specified as standard **Kubernetes object references** in a declarative way. + +Knative eventing uses CloudEvents data format by default. +You may want to set some properties that specify the event attributes such as the event type. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.telegram.events # <1> +---- +<1> Sets the event type attribute of the CloudEvent produced by this Pipe -Note that source and sink are specified declaratively as standard **Kubernetes object references**. +This way you may specify event attributes before publishing to the Knative broker. +Note that Camel uses a default CloudEvents event type `org.apache.camel.event` for events produced by Camel. The example shows how we can reference the "telegram-text-source" resource in a Pipe. It's contained in the `source` section because it's a Kamelet of type "source". A Kamelet of type "sink", by contrast, can only be used in the `sink` section of a `Pipe`. -**Under the covers, a Pipe creates an Integration** resource that implements the binding, but this is transparent to the end user. +**Under the covers, a Pipe creates an Integration** resource that implements the binding, but all details of how to connect with +Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a `SinkBinding` concept +under the covers in order to retrieve the Knative broker endpoint URL. + +In the same way you can also connect a Kamelet source to a Knative channel. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative-channel +spec: + source: # <1> + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: # <2> + ref: + kind: InMemoryChannel + apiVersion: messaging.knative.dev/v1 + name: messages +---- +<1> Reference to the source that provides data +<2> Reference to the Knative channel that acts as the sink where data should be sent to + +When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe. +Events consumed from Knative event stream will be pushed to the given sink of the Pipe. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: knative-to-slack +spec: + source: # <1> + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.messages + sink: # <2> + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: slack-sink + properties: + channel: "#my-channel" + webhookUrl: the-webhook-url +---- +<1> Reference to the Knative broker source that provides data +<2> Reference to the sink where data should be sent to + +Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing. +In the example, all events of type `org.apache.camel.event.messages` get forwarded to the given Slack channel using the Webhook API. + +When consuming events from the Knative broker you most likely need to filter and select the events to process. +You can do that with the properties set on the Knative broker source reference. +The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions. + +In the background Camel K will automatically create a Knative Trigger for the Pipe that uses the filter attributes accordingly. === Binding to a Kafka Topic diff --git a/docs/modules/ROOT/pages/running/camel-runtimes.adoc b/docs/modules/ROOT/pages/running/camel-runtimes.adoc index c479bfa31c..3eacd79873 100644 --- a/docs/modules/ROOT/pages/running/camel-runtimes.adoc +++ b/docs/modules/ROOT/pages/running/camel-runtimes.adoc @@ -143,13 +143,13 @@ NOTE: this is a best effort analysis taking as reference the work available in v |v |x -|KNative Service +|Knative Service |x |x |x |v -|KNative +|Knative |x |v |x diff --git a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc index abe5aee8c4..89c648300b 100644 --- a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc +++ b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc @@ -7737,6 +7737,24 @@ Enables the camel-k-operator to set the "bindings.knative.dev/include=true" labe As Knative requires this label to perform injection of K_SINK URL into the service. If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true) +|`filters` + +[]string +| + + +Sets filter attributes on the event stream (such as event type, source, subject and so on). +A list of key-value pairs that represent filter attributes and its values. +The syntax is KEY=VALUE, e.g., `source="my.source"`. +Filter attributes get set on the Knative trigger that is being created as part of this integration. + +|`filterEventType` + +bool +| + + +Enables the default filtering for the Knative trigger using the event type +If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true) + |=== diff --git a/docs/modules/traits/pages/knative.adoc b/docs/modules/traits/pages/knative.adoc index d22f19f07f..49b764ccb6 100755 --- a/docs/modules/traits/pages/knative.adoc +++ b/docs/modules/traits/pages/knative.adoc @@ -87,6 +87,18 @@ It's enabled by default when the integration targets a single sink As Knative requires this label to perform injection of K_SINK URL into the service. If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true) +| knative.filters +| []string +| Sets filter attributes on the event stream (such as event type, source, subject and so on). +A list of key-value pairs that represent filter attributes and its values. +The syntax is KEY=VALUE, e.g., `source="my.source"`. +Filter attributes get set on the Knative trigger that is being created as part of this integration. + +| knative.filter-event-type +| bool +| Enables the default filtering for the Knative trigger using the event type +If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true) + |=== // End of autogenerated code - DO NOT EDIT! (configuration) diff --git a/e2e/knative/kamelet_test.go b/e2e/knative/kamelet_test.go index 6d48306380..1ae16ffe22 100644 --- a/e2e/knative/kamelet_test.go +++ b/e2e/knative/kamelet_test.go @@ -47,16 +47,16 @@ func TestKameletChange(t *testing.T) { timerSource := "my-timer-source" g.Expect(CreateTimerKamelet(t, ctx, operatorID, ns, timerSource)()).To(Succeed()) g.Expect(CreateKnativeChannel(t, ctx, ns, knChannel)()).To(Succeed()) - // Consumer route that will read from the KNative channel + // Consumer route that will read from the Knative channel g.Expect(KamelRunWithID(t, ctx, operatorID, ns, "files/test-kamelet-display.groovy", "-w").Execute()).To(Succeed()) g.Eventually(IntegrationPodPhase(t, ctx, ns, "test-kamelet-display")).Should(Equal(corev1.PodRunning)) // Create the Pipe - g.Expect(KamelBindWithID(t, ctx, operatorID, ns, timerSource, knChannelConf, "-p", "source.message=HelloKNative!", "--annotation", "trait.camel.apache.org/health.enabled=true", "--annotation", "trait.camel.apache.org/health.readiness-initial-delay=10", "--name", timerPipe).Execute()).To(Succeed()) + g.Expect(KamelBindWithID(t, ctx, operatorID, ns, timerSource, knChannelConf, "-p", "source.message=HelloKnative!", "--annotation", "trait.camel.apache.org/health.enabled=true", "--annotation", "trait.camel.apache.org/health.readiness-initial-delay=10", "--name", timerPipe).Execute()).To(Succeed()) g.Eventually(IntegrationPodPhase(t, ctx, ns, timerPipe)).Should(Equal(corev1.PodRunning)) g.Eventually(IntegrationConditionStatus(t, ctx, ns, timerPipe, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue)) // Consume the message - g.Eventually(IntegrationLogs(t, ctx, ns, "test-kamelet-display"), TestTimeoutShort).Should(ContainSubstring("HelloKNative!")) + g.Eventually(IntegrationLogs(t, ctx, ns, "test-kamelet-display"), TestTimeoutShort).Should(ContainSubstring("HelloKnative!")) g.Eventually(PipeCondition(t, ctx, ns, timerPipe, v1.PipeConditionReady), TestTimeoutMedium).Should(And( WithTransform(PipeConditionStatusExtract, Equal(corev1.ConditionTrue)), diff --git a/e2e/support/util/dump.go b/e2e/support/util/dump.go index a5741e697f..75b144db29 100644 --- a/e2e/support/util/dump.go +++ b/e2e/support/util/dump.go @@ -35,9 +35,12 @@ import ( routev1 "github.com/openshift/api/route/v1" olm "github.com/operator-framework/api/pkg/operators/v1alpha1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/client/camel/clientset/versioned" + "github.com/apache/camel-k/v2/pkg/util/knative" "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/openshift" ) @@ -202,6 +205,41 @@ func Dump(ctx context.Context, c client.Client, ns string, t *testing.T) error { t.Logf("---\n%s\n---\n", string(pdata)) } + // Knative resources + if installed, _ := knative.IsEventingInstalled(c); installed { + var trgs eventingv1.TriggerList + err = c.List(ctx, &trgs) + if err != nil { + return err + } + t.Logf("Found %d Knative trigger:\n", len(trgs.Items)) + for _, p := range trgs.Items { + ref := p + pdata, err := kubernetes.ToYAMLNoManagedFields(&ref) + if err != nil { + return err + } + t.Logf("---\n%s\n---\n", string(pdata)) + } + } + + if installed, _ := knative.IsServingInstalled(c); installed { + var ksrvs servingv1.ServiceList + err = c.List(ctx, &ksrvs) + if err != nil { + return err + } + t.Logf("Found %d Knative services:\n", len(ksrvs.Items)) + for _, p := range ksrvs.Items { + ref := p + pdata, err := kubernetes.ToYAMLNoManagedFields(&ref) + if err != nil { + return err + } + t.Logf("---\n%s\n---\n", string(pdata)) + } + } + // CamelCatalogs cats, err := camelClient.CamelV1().CamelCatalogs(ns).List(ctx, metav1.ListOptions{}) if err != nil { diff --git a/e2e/yaks/common/knative-broker/event-source-pipe.yaml b/e2e/yaks/common/knative-broker/event-source-pipe.yaml new file mode 100644 index 0000000000..ace8c3075f --- /dev/null +++ b/e2e/yaks/common/knative-broker/event-source-pipe.yaml @@ -0,0 +1,45 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: event-source-pipe +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: timer-source + properties: + period: 1000 + message: "Hello this is event-1!" + steps: + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-action + properties: + showHeaders: true + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.messages + ce.override.ce-source: org.apache.camel diff --git a/e2e/yaks/common/knative-broker/knative-pipe.feature b/e2e/yaks/common/knative-broker/knative-pipe.feature new file mode 100644 index 0000000000..de293337aa --- /dev/null +++ b/e2e/yaks/common/knative-broker/knative-pipe.feature @@ -0,0 +1,35 @@ +Feature: Pipes connecting with Knative broker + + Background: + Given create Knative broker default + Given Knative broker default is running + Given Disable auto removal of Camel K resources + Given Disable auto removal of Kubernetes resources + Given Camel K resource polling configuration + | maxAttempts | 60 | + | delayBetweenAttempts | 3000 | + + Scenario: Pipe sends messages to the broker + Given load Pipe event-source-pipe.yaml + Then Camel K integration event-source-pipe should be running + + Scenario: Pipe receives events from the broker + Given load Pipe log-sink-pipe.yaml + Then Camel K integration log-sink-pipe should be running + And Camel K integration log-sink-pipe should print Hello this is event-1! + + Scenario: Pipe receives events with source filter + Given load Pipe source-filter-pipe.yaml + Then Camel K integration source-filter-pipe should be running + And Camel K integration source-filter-pipe should Hello this is event-1! + + Scenario: Pipe receives all events from the broker + Given load Pipe no-filter-pipe.yaml + Then Camel K integration no-filter-pipe should be running + And Camel K integration no-filter-pipe should print Hello this is event-1! + + Scenario: Remove resources + Given delete Camel K integration event-source-pipe + Given delete Camel K integration log-sink-pipe + Given delete Camel K integration source-filter-pipe + Given delete Camel K integration no-filter-pipe diff --git a/e2e/yaks/common/knative-broker/log-sink-pipe.yaml b/e2e/yaks/common/knative-broker/log-sink-pipe.yaml new file mode 100644 index 0000000000..dba1cf6827 --- /dev/null +++ b/e2e/yaks/common/knative-broker/log-sink-pipe.yaml @@ -0,0 +1,39 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: log-sink-pipe +spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.messages + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + multiline: true + showHeaders: true + showBodyType: false + showExchangePattern: false diff --git a/e2e/yaks/common/knative-broker/no-filter-pipe.yaml b/e2e/yaks/common/knative-broker/no-filter-pipe.yaml new file mode 100644 index 0000000000..3638543e79 --- /dev/null +++ b/e2e/yaks/common/knative-broker/no-filter-pipe.yaml @@ -0,0 +1,37 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: no-filter-pipe +spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + multiline: true + showHeaders: true + showBodyType: false + showExchangePattern: false diff --git a/e2e/yaks/common/knative-broker/source-filter-pipe.yaml b/e2e/yaks/common/knative-broker/source-filter-pipe.yaml new file mode 100644 index 0000000000..12a78a6fb9 --- /dev/null +++ b/e2e/yaks/common/knative-broker/source-filter-pipe.yaml @@ -0,0 +1,39 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: source-filter-pipe +spec: + source: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + source: org.apache.camel + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: log-sink + properties: + multiline: true + showHeaders: true + showBodyType: false + showExchangePattern: false diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml b/e2e/yaks/common/knative-broker/yaks-config.yaml index cd754db649..6234c8aec2 100644 --- a/e2e/yaks/common/knative-broker/yaks-config.yaml +++ b/e2e/yaks/common/knative-broker/yaks-config.yaml @@ -18,6 +18,12 @@ config: namespace: temporary: true + runtime: + resources: + - event-source-pipe.yaml + - log-sink-pipe.yaml + - source-filter-pipe.yaml + - no-filter-pipe.yaml post: - name: print dump if: env:CI=true && failure() diff --git a/helm/camel-k/crds/crd-integration-platform.yaml b/helm/camel-k/crds/crd-integration-platform.yaml index eeeec4364b..3107e80cbd 100644 --- a/helm/camel-k/crds/crd-integration-platform.yaml +++ b/helm/camel-k/crds/crd-integration-platform.yaml @@ -1395,11 +1395,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to @@ -3392,11 +3409,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/helm/camel-k/crds/crd-integration-profile.yaml b/helm/camel-k/crds/crd-integration-profile.yaml index c827dbd52e..2c24920a10 100644 --- a/helm/camel-k/crds/crd-integration-profile.yaml +++ b/helm/camel-k/crds/crd-integration-profile.yaml @@ -1272,11 +1272,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to @@ -3152,11 +3169,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/helm/camel-k/crds/crd-integration.yaml b/helm/camel-k/crds/crd-integration.yaml index 40b06eff20..26fe736411 100644 --- a/helm/camel-k/crds/crd-integration.yaml +++ b/helm/camel-k/crds/crd-integration.yaml @@ -7336,11 +7336,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/helm/camel-k/crds/crd-kamelet-binding.yaml b/helm/camel-k/crds/crd-kamelet-binding.yaml index fbfba55cab..202a701edc 100644 --- a/helm/camel-k/crds/crd-kamelet-binding.yaml +++ b/helm/camel-k/crds/crd-kamelet-binding.yaml @@ -7624,12 +7624,29 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the + event stream when no other filter criteria is given. + (default: true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream + (such as event type, source, subject and so on). A list + of key-value pairs that represent filter attributes + and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`. + Filter attributes get set on the Knative trigger that + is being created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace diff --git a/helm/camel-k/crds/crd-pipe.yaml b/helm/camel-k/crds/crd-pipe.yaml index 29962f2686..c2648f8c8b 100644 --- a/helm/camel-k/crds/crd-pipe.yaml +++ b/helm/camel-k/crds/crd-pipe.yaml @@ -7622,12 +7622,29 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the + event stream when no other filter criteria is given. + (default: true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream + (such as event type, source, subject and so on). A list + of key-value pairs that represent filter attributes + and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`. + Filter attributes get set on the Knative trigger that + is being created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace diff --git a/pkg/apis/camel/v1/knative/types.go b/pkg/apis/camel/v1/knative/types.go index 2fb6b96655..0259f1fa32 100644 --- a/pkg/apis/camel/v1/knative/types.go +++ b/pkg/apis/camel/v1/knative/types.go @@ -61,6 +61,8 @@ const ( CamelServiceTypeChannel CamelServiceType = "channel" // CamelServiceTypeEvent is used when the target service is the Knative broker. CamelServiceTypeEvent CamelServiceType = "event" + // CamelCloudEventTypeDefault is used as a default value for the CloudEvent type. + CamelCloudEventTypeDefault string = "org.apache.camel.event" ) func (s CamelServiceType) ResourceDescription(subject string) string { diff --git a/pkg/apis/camel/v1/trait/knative.go b/pkg/apis/camel/v1/trait/knative.go index b8ce7fe46c..2681ac982a 100644 --- a/pkg/apis/camel/v1/trait/knative.go +++ b/pkg/apis/camel/v1/trait/knative.go @@ -60,4 +60,12 @@ type KnativeTrait struct { // As Knative requires this label to perform injection of K_SINK URL into the service. // If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true) NamespaceLabel *bool `property:"namespace-label" json:"namespaceLabel,omitempty"` + // Sets filter attributes on the event stream (such as event type, source, subject and so on). + // A list of key-value pairs that represent filter attributes and its values. + // The syntax is KEY=VALUE, e.g., `source="my.source"`. + // Filter attributes get set on the Knative trigger that is being created as part of this integration. + Filters []string `property:"filters" json:"filters,omitempty"` + // Enables the default filtering for the Knative trigger using the event type + // If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true) + FilterEventType *bool `property:"filter-event-type" json:"filterEventType,omitempty"` } diff --git a/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go b/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go index c8c03773ea..a8db3093a2 100644 --- a/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go +++ b/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go @@ -706,6 +706,16 @@ func (in *KnativeTrait) DeepCopyInto(out *KnativeTrait) { *out = new(bool) **out = **in } + if in.Filters != nil { + in, out := &in.Filters, &out.Filters + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.FilterEventType != nil { + in, out := &in.FilterEventType, &out.FilterEventType + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KnativeTrait. diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml index eeeec4364b..3107e80cbd 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml @@ -1395,11 +1395,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to @@ -3392,11 +3409,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml index c827dbd52e..2c24920a10 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml @@ -1272,11 +1272,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to @@ -3152,11 +3169,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml b/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml index 40b06eff20..26fe736411 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml @@ -7336,11 +7336,28 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the event + stream when no other filter criteria is given. (default: + true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream (such + as event type, source, subject and so on). A list of key-value + pairs that represent filter attributes and its values. The + syntax is KEY=VALUE, e.g., `source="my.source"`. Filter + attributes get set on the Knative trigger that is being + created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace As Knative requires this label to diff --git a/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml b/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml index fbfba55cab..202a701edc 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml @@ -7624,12 +7624,29 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the + event stream when no other filter criteria is given. + (default: true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream + (such as event type, source, subject and so on). A list + of key-value pairs that represent filter attributes + and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`. + Filter attributes get set on the Knative trigger that + is being created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace diff --git a/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml b/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml index 29962f2686..c2648f8c8b 100644 --- a/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml +++ b/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml @@ -7622,12 +7622,29 @@ spec: items: type: string type: array + filterEventType: + description: 'Enables the default filtering for the Knative + trigger using the event type If this is true, the created + Knative trigger uses the event type as a filter on the + event stream when no other filter criteria is given. + (default: true)' + type: boolean filterSourceChannels: description: Enables filtering on events based on the header "ce-knativehistory". Since this header has been removed in newer versions of Knative, filtering is disabled by default. type: boolean + filters: + description: Sets filter attributes on the event stream + (such as event type, source, subject and so on). A list + of key-value pairs that represent filter attributes + and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`. + Filter attributes get set on the Knative trigger that + is being created as part of this integration. + items: + type: string + type: array namespaceLabel: description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true" label to the namespace diff --git a/pkg/trait/jvm_test.go b/pkg/trait/jvm_test.go index ff5fae2798..9206e45148 100644 --- a/pkg/trait/jvm_test.go +++ b/pkg/trait/jvm_test.go @@ -309,7 +309,7 @@ func TestApplyJvmTraitWithDeploymentResource(t *testing.T) { }, d.Spec.Template.Spec.Containers[0].Args) } -func TestApplyJvmTraitWithKNativeResource(t *testing.T) { +func TestApplyJvmTraitWithKnativeResource(t *testing.T) { trait, environment := createNominalJvmTest(v1.IntegrationKitTypePlatform) s := serving.Service{} diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index db87ad5045..c8aa3fc87e 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/apache/camel-k/v2/pkg/util/boolean" + "github.com/apache/camel-k/v2/pkg/util/property" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -501,41 +502,53 @@ func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.Came } func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string, path string) error { - // TODO extend to additional filters too, to filter them at source and not at destination found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool { return trigger.Spec.Broker == ref.Name && - trigger.Spec.Filter != nil && - trigger.Spec.Filter.Attributes["type"] == eventType // can be also missing + trigger.Name == knativeutil.GetTriggerName(ref.Name, e.Integration.Name, eventType) }) - if !found { - if ref.Namespace == "" { - ref.Namespace = e.Integration.Namespace - } - controllerStrategy, err := e.DetermineControllerStrategy() + if found { + return nil + } + + if ref.Namespace == "" { + ref.Namespace = e.Integration.Namespace + } + + controllerStrategy, err := e.DetermineControllerStrategy() + if err != nil { + return err + } + + var attributes = make(map[string]string) + for _, filterExpression := range t.Filters { + key, value := property.SplitPropertyFileEntry(filterExpression) + attributes[key] = value + } + + if _, eventTypeSpecified := attributes["type"]; !eventTypeSpecified && pointer.BoolDeref(t.FilterEventType, true) && eventType != "" { + // Apply default trigger filter attribute for the event type + attributes["type"] = eventType + } + + var trigger *eventing.Trigger + switch controllerStrategy { + case ControllerStrategyKnativeService: + trigger, err = knativeutil.CreateKnativeServiceTrigger(*ref, e.Integration.Name, eventType, path, attributes) if err != nil { return err } - - var trigger *eventing.Trigger - switch controllerStrategy { - case ControllerStrategyKnativeService: - trigger, err = knativeutil.CreateKnativeServiceTrigger(*ref, e.Integration.Name, eventType, path) - if err != nil { - return err - } - case ControllerStrategyDeployment: - trigger, err = knativeutil.CreateServiceTrigger(*ref, e.Integration.Name, eventType, path) - if err != nil { - return err - } - default: - return fmt.Errorf("failed to create Knative trigger: unsupported controller strategy %s", controllerStrategy) + case ControllerStrategyDeployment: + trigger, err = knativeutil.CreateServiceTrigger(*ref, e.Integration.Name, eventType, path, attributes) + if err != nil { + return err } - - e.Resources.Add(trigger) + default: + return fmt.Errorf("failed to create Knative trigger: unsupported controller strategy %s", controllerStrategy) } + e.Resources.Add(trigger) + return nil } diff --git a/pkg/trait/knative_test.go b/pkg/trait/knative_test.go index d580a3f24e..bcecc55818 100644 --- a/pkg/trait/knative_test.go +++ b/pkg/trait/knative_test.go @@ -43,6 +43,7 @@ import ( "github.com/apache/camel-k/v2/pkg/client" "github.com/apache/camel-k/v2/pkg/util/boolean" "github.com/apache/camel-k/v2/pkg/util/camel" + "github.com/apache/camel-k/v2/pkg/util/knative" k8sutils "github.com/apache/camel-k/v2/pkg/util/kubernetes" "github.com/apache/camel-k/v2/pkg/util/test" ) @@ -272,7 +273,7 @@ func TestKnativeEnvConfigurationFromSource(t *testing.T) { })) } -func TestKnativeTriggerConfiguration(t *testing.T) { +func TestKnativeTriggerExplicitFilterConfig(t *testing.T) { catalog, err := camel.DefaultCatalog() require.NoError(t, err) @@ -316,6 +317,7 @@ func TestKnativeTriggerConfiguration(t *testing.T) { Trait: traitv1.Trait{ Enabled: pointer.Bool(true), }, + Filters: []string{"source=my-source"}, }, }, }, @@ -352,20 +354,688 @@ func TestKnativeTriggerConfiguration(t *testing.T) { assert.NotEmpty(t, environment.ExecutedTraits) assert.NotNil(t, environment.GetTrait("knative")) - assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { - matching := true + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) - matching = matching && assert.Equal(t, "default", trigger.Spec.Broker) - matching = matching && assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) - matching = matching && assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) - matching = matching && assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) - matching = matching && assert.Equal(t, "default-test-evttype", trigger.Name) + assert.NotNil(t, trigger) - return matching - })) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 2) + assert.Equal(t, trigger.Spec.Filter.Attributes["type"], "evt.type") + assert.Equal(t, trigger.Spec.Filter.Attributes["source"], "my-source") +} + +func TestKnativeTriggerExplicitFilterConfigNoEventTypeFilter(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + Filters: []string{"source=my-source"}, + FilterEventType: pointer.Bool(false), + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) + + assert.NotNil(t, trigger) + + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, trigger.Spec.Filter.Attributes["source"], "my-source") +} + +func TestKnativeTriggerDefaultEventTypeFilter(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) + + assert.NotNil(t, trigger) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, "evt.type", trigger.Spec.Filter.Attributes["type"]) +} + +func TestKnativeTriggerDefaultEventTypeFilterDisabled(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + FilterEventType: pointer.Bool(false), + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) + + assert.NotNil(t, trigger) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.Nil(t, trigger.Spec.Filter) } -func TestKnativeTriggerConfigurationNoServingAvailable(t *testing.T) { +func TestKnativeMultipleTrigger(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type.1") + .log("${body}"); + + from("knative:event/evt.type.2") + .log("${body}"); + + from("knative:event") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + triggerNames := make([]string, 0) + environment.Resources.VisitKnativeTrigger(func(trigger *eventing.Trigger) { + triggerNames = append(triggerNames, trigger.Name) + }) + + assert.Len(t, triggerNames, 3) + + trigger1 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type.1") + }) + + assert.NotNil(t, trigger1) + assert.Equal(t, "default", trigger1.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger1.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger1.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type.1", trigger1.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype1", trigger1.Name) + + assert.NotNil(t, trigger1.Spec.Filter) + assert.Len(t, trigger1.Spec.Filter.Attributes, 1) + assert.Equal(t, "evt.type.1", trigger1.Spec.Filter.Attributes["type"]) + + trigger2 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type.2") + }) + + assert.NotNil(t, trigger2) + assert.Equal(t, "default", trigger2.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger2.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger2.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type.2", trigger2.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype2", trigger2.Name) + + assert.NotNil(t, trigger2.Spec.Filter) + assert.Len(t, trigger2.Spec.Filter.Attributes, 1) + assert.Equal(t, "evt.type.2", trigger2.Spec.Filter.Attributes["type"]) + + trigger3 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "") + }) + + assert.NotNil(t, trigger3) + assert.Equal(t, "default", trigger3.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger3.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger3.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/", trigger3.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test", trigger3.Name) + + assert.Nil(t, trigger3.Spec.Filter) +} + +func TestKnativeMultipleTriggerAdditionalFilterConfig(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event/evt.type.1") + .log("${body}"); + + from("knative:event/evt.type.2") + .log("${body}"); + + from("knative:event") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + Filters: []string{"subject=Hello"}, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + triggerNames := make([]string, 0) + environment.Resources.VisitKnativeTrigger(func(trigger *eventing.Trigger) { + triggerNames = append(triggerNames, trigger.Name) + }) + + assert.Len(t, triggerNames, 3) + + trigger1 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type.1") + }) + + assert.NotNil(t, trigger1) + assert.Equal(t, "default", trigger1.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger1.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger1.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type.1", trigger1.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype1", trigger1.Name) + + assert.NotNil(t, trigger1.Spec.Filter) + assert.Len(t, trigger1.Spec.Filter.Attributes, 2) + assert.Equal(t, "evt.type.1", trigger1.Spec.Filter.Attributes["type"]) + assert.Equal(t, "Hello", trigger1.Spec.Filter.Attributes["subject"]) + + trigger2 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type.2") + }) + + assert.NotNil(t, trigger2) + assert.Equal(t, "default", trigger2.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger2.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger2.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type.2", trigger2.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype2", trigger2.Name) + + assert.NotNil(t, trigger2.Spec.Filter) + assert.Len(t, trigger2.Spec.Filter.Attributes, 2) + assert.Equal(t, "evt.type.2", trigger2.Spec.Filter.Attributes["type"]) + assert.Equal(t, "Hello", trigger2.Spec.Filter.Attributes["subject"]) + + trigger3 := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "") + }) + + assert.NotNil(t, trigger3) + assert.Equal(t, "default", trigger3.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger3.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger3.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/", trigger3.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test", trigger3.Name) + + assert.NotNil(t, trigger3.Spec.Filter) + assert.Len(t, trigger3.Spec.Filter.Attributes, 1) + assert.Equal(t, "Hello", trigger3.Spec.Filter.Attributes["subject"]) +} + +func TestKnativeTriggerNoEventType(t *testing.T) { + catalog, err := camel.DefaultCatalog() + require.NoError(t, err) + + c, err := NewFakeClient("ns") + require.NoError(t, err) + + traitCatalog := NewCatalog(c) + + environment := Environment{ + CamelCatalog: catalog, + Catalog: traitCatalog, + Client: c, + Integration: &v1.Integration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "ns", + }, + Status: v1.IntegrationStatus{ + Phase: v1.IntegrationPhaseDeploying, + }, + Spec: v1.IntegrationSpec{ + Profile: v1.TraitProfileKnative, + Sources: []v1.SourceSpec{ + { + DataSpec: v1.DataSpec{ + Name: "route.java", + Content: ` + public class CartoonMessagesMover extends RouteBuilder { + public void configure() { + from("knative:event") + .log("${body}"); + } + } + `, + }, + Language: v1.LanguageJavaSource, + }, + }, + Traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Trait: traitv1.Trait{ + Enabled: pointer.Bool(true), + }, + }, + }, + }, + }, + IntegrationKit: &v1.IntegrationKit{ + Status: v1.IntegrationKitStatus{ + Phase: v1.IntegrationKitPhaseReady, + }, + }, + Platform: &v1.IntegrationPlatform{ + Spec: v1.IntegrationPlatformSpec{ + Cluster: v1.IntegrationPlatformClusterOpenShift, + Build: v1.IntegrationPlatformBuildSpec{ + PublishStrategy: v1.IntegrationPlatformBuildPublishStrategyS2I, + Registry: v1.RegistrySpec{Address: "registry"}, + RuntimeVersion: catalog.Runtime.Version, + }, + Profile: v1.TraitProfileKnative, + }, + Status: v1.IntegrationPlatformStatus{ + Phase: v1.IntegrationPlatformPhaseReady, + }, + }, + EnvVars: make([]corev1.EnvVar, 0), + ExecutedTraits: make([]Trait, 0), + Resources: k8sutils.NewCollection(), + } + environment.Platform.ResyncStatusFullConfig() + + // don't care about conditions in this unit test + _, err = traitCatalog.apply(&environment) + + require.NoError(t, err) + assert.NotEmpty(t, environment.ExecutedTraits) + assert.NotNil(t, environment.GetTrait("knative")) + + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "") + }) + + assert.NotNil(t, trigger) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test", trigger.Name) + + assert.Nil(t, trigger.Spec.Filter) +} + +func TestKnativeTriggerNoServingAvailable(t *testing.T) { catalog, err := camel.DefaultCatalog() require.NoError(t, err) @@ -448,17 +1118,21 @@ func TestKnativeTriggerConfigurationNoServingAvailable(t *testing.T) { assert.NotEmpty(t, environment.ExecutedTraits) assert.NotNil(t, environment.GetTrait("knative")) - assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { - matching := true + trigger := environment.Resources.GetKnativeTrigger(func(trigger *eventing.Trigger) bool { + return trigger.Name == knative.GetTriggerName("default", "test", "evt.type") + }) - matching = matching && assert.Equal(t, "default", trigger.Spec.Broker) - matching = matching && assert.Equal(t, "v1", trigger.Spec.Subscriber.Ref.APIVersion) - matching = matching && assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) - matching = matching && assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) - matching = matching && assert.Equal(t, "default-test-evttype", trigger.Name) + assert.NotNil(t, trigger) - return matching - })) + assert.Equal(t, "default", trigger.Spec.Broker) + assert.Equal(t, "v1", trigger.Spec.Subscriber.Ref.APIVersion) + assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind) + assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path) + assert.Equal(t, "default-test-evttype", trigger.Name) + + assert.NotNil(t, trigger.Spec.Filter) + assert.Len(t, trigger.Spec.Filter.Attributes, 1) + assert.Equal(t, "evt.type", trigger.Spec.Filter.Attributes["type"]) } func TestKnativePlatformHttpConfig(t *testing.T) { diff --git a/pkg/util/bindings/bindings_test.go b/pkg/util/bindings/bindings_test.go index 7cfa78c868..0cd7e35165 100644 --- a/pkg/util/bindings/bindings_test.go +++ b/pkg/util/bindings/bindings_test.go @@ -125,6 +125,12 @@ func TestBindings(t *testing.T) { }), }, uri: "knative:event/myeventtype?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + traits: v1.Traits{ + Knative: &traitv1.KnativeTrait{ + Filters: []string{"type=myeventtype"}, + FilterEventType: pointer.Bool(true), + }, + }, }, { endpointType: v1.EndpointTypeSource, diff --git a/pkg/util/bindings/knative_ref.go b/pkg/util/bindings/knative_ref.go index 301e62099e..2e2adba9a0 100644 --- a/pkg/util/bindings/knative_ref.go +++ b/pkg/util/bindings/knative_ref.go @@ -24,7 +24,9 @@ import ( v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" knativeapis "github.com/apache/camel-k/v2/pkg/apis/camel/v1/knative" - v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait" + "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1" + "github.com/apache/camel-k/v2/pkg/util/property" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -42,8 +44,6 @@ func (k KnativeRefBindingProvider) ID() string { } // Translate --. -// -//nolint:dupl func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx EndpointContext, e v1.Endpoint) (*Binding, error) { if e.Ref == nil { // works only on refs @@ -78,13 +78,9 @@ func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx End if props == nil { props = make(map[string]string) } - if props["apiVersion"] == "" { - props["apiVersion"] = e.Ref.APIVersion - } - if props["kind"] == "" { - props["kind"] = e.Ref.Kind - } + var filterEventType = true + var filterExpressions = make([]string, 0) var serviceURI string // TODO: refactor @@ -93,26 +89,67 @@ func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx End if props["name"] == "" { props["name"] = e.Ref.Name } + + if endpointCtx.Type == v1.EndpointTypeSource { + // Configure trigger filter attributes for the Knative event source + for key, value := range props { + if key == "cloudEventsType" { + // cloudEventsType is a synonym for type filter attribute + filterExpressions = append(filterExpressions, fmt.Sprintf("type=%s", value)) + } else if key != "name" { + filterExpressions = append(filterExpressions, fmt.Sprintf("%s=%s", key, value)) + } + } + } + if eventType, ok := props["type"]; ok { - // consume prop + // consume the type property and set it as URI path parameter delete(props, "type") serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, eventType) + } else if endpointCtx.Type == v1.EndpointTypeSink || endpointCtx.Type == v1.EndpointTypeAction { + // Allowing no event type, but it can fail. See https://github.com/apache/camel-k-runtime/issues/536 + serviceURI = fmt.Sprintf("knative:%s", *serviceType) + } else if cloudEventsType, found := props["cloudEventsType"]; found { + // set the cloud events type as URI path parameter, but keep it also as URI query param + serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, cloudEventsType) } else { - if endpointCtx.Type == v1.EndpointTypeSink || endpointCtx.Type == v1.EndpointTypeAction { - // Allowing no event type, but it can fail. See https://github.com/apache/camel-k/v2-runtime/issues/536 - serviceURI = fmt.Sprintf("knative:%s", *serviceType) - } else { - return nil, errors.New(`property "type" must be provided when reading from the Broker`) - } + // Use default event type as a service URI path parameter as we need it for the Camel endpoint, but do not filter by the event type + filterEventType = false + serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, knativeapis.CamelCloudEventTypeDefault) } } else { serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, url.PathEscape(e.Ref.Name)) } + // Remove filter attributes from props to avoid adding them to the service URI query params + for _, exp := range filterExpressions { + key, _ := property.SplitPropertyFileEntry(exp) + delete(props, key) + } + + // Enrich service URI query params if not set + if props["apiVersion"] == "" { + props["apiVersion"] = e.Ref.APIVersion + } + if props["kind"] == "" { + props["kind"] = e.Ref.Kind + } + serviceURI = uri.AppendParameters(serviceURI, props) - return &Binding{ + var binding = Binding{ URI: serviceURI, - }, nil + } + + if len(filterExpressions) > 0 || !filterEventType { + binding.Traits = v1.Traits{ + Knative: &trait.KnativeTrait{ + Filters: filterExpressions, + FilterEventType: &filterEventType, + }, + } + } + + return &binding, nil } func isKnownKnativeResource(ref *corev1.ObjectReference) (bool, error) { @@ -149,8 +186,6 @@ func (k V1alpha1KnativeRefBindingProvider) ID() string { // Translate --. // Deprecated. -// -//nolint:dupl func (k V1alpha1KnativeRefBindingProvider) Translate(ctx V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e v1alpha1.Endpoint) (*Binding, error) { if e.Ref == nil { // works only on refs diff --git a/pkg/util/bindings/knative_ref_test.go b/pkg/util/bindings/knative_ref_test.go index 6ddadad491..de44869f59 100644 --- a/pkg/util/bindings/knative_ref_test.go +++ b/pkg/util/bindings/knative_ref_test.go @@ -22,20 +22,170 @@ import ( "fmt" "testing" - "github.com/apache/camel-k/v2/pkg/util/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + "k8s.io/utils/pointer" camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/util/test" ) -func TestKnativeRefBinding(t *testing.T) { +func TestKnativeRefAsSource(t *testing.T) { testcases := []struct { + name string + endpoint camelv1.Endpoint + uri string + filters []string + filterEventType *bool + }{ + { + name: "broker", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + }, + uri: "knative:event/org.apache.camel.event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + filterEventType: pointer.Bool(false), + }, + { + name: "broker-name-property", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"name": "my-broker"}), + }, + uri: "knative:event/org.apache.camel.event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=my-broker", + filterEventType: pointer.Bool(false), + }, + { + name: "event-type-filter", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"type": "org.apache.camel.myevent"}), + }, + uri: "knative:event/org.apache.camel.myevent?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + filters: []string{"type=org.apache.camel.myevent"}, + }, + { + name: "cloud-events-type-filter", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"cloudEventsType": "org.apache.camel.cloudevent"}), + }, + uri: "knative:event/org.apache.camel.cloudevent?apiVersion=eventing.knative.dev%2Fv1&cloudEventsType=org.apache.camel.cloudevent&kind=Broker&name=default", + filters: []string{"type=org.apache.camel.cloudevent"}, + }, + { + name: "event-filters", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"source": "my-source", "subject": "mySubject"}), + }, + uri: "knative:event/org.apache.camel.event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + filters: []string{"source=my-source", "subject=mySubject"}, + filterEventType: pointer.Bool(false), + }, + { + name: "event-extension-filters", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"myextension": "foo"}), + }, + uri: "knative:event/org.apache.camel.event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + filters: []string{"myextension=foo"}, + filterEventType: pointer.Bool(false), + }, + { + name: "channel", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Channel", + Name: "mychannel", + APIVersion: "messaging.knative.dev/v1", + }, + }, + uri: "knative:channel/mychannel?apiVersion=messaging.knative.dev%2Fv1&kind=Channel", + }, + { + name: "service", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Service", + Name: "myservice", + APIVersion: "serving.knative.dev/v1", + }, + }, + uri: "knative:endpoint/myservice?apiVersion=serving.knative.dev%2Fv1&kind=Service", + }, + } + + for i, tc := range testcases { + t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := test.NewFakeClient() + require.NoError(t, err) + + bindingContext := BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + Profile: camelv1.TraitProfileKnative, + } + + binding, err := KnativeRefBindingProvider{}.Translate(bindingContext, EndpointContext{ + Type: camelv1.EndpointTypeSource, + }, tc.endpoint) + require.NoError(t, err) + assert.NotNil(t, binding) + assert.Equal(t, tc.uri, binding.URI) + + if tc.filters != nil || !pointer.BoolDeref(tc.filterEventType, true) { + assert.NotNil(t, binding.Traits.Knative) + assert.Len(t, binding.Traits.Knative.Filters, len(tc.filters)) + + for _, filterExpression := range tc.filters { + assert.Contains(t, binding.Traits.Knative.Filters, filterExpression) + } + + assert.Equal(t, pointer.BoolDeref(binding.Traits.Knative.FilterEventType, true), pointer.BoolDeref(tc.filterEventType, true)) + } + }) + } +} + +func TestKnativeRefAsSink(t *testing.T) { + testcases := []struct { + name string endpoint camelv1.Endpoint uri string }{ { + name: "broker", endpoint: camelv1.Endpoint{ Ref: &v1.ObjectReference{ Kind: "Broker", @@ -46,6 +196,31 @@ func TestKnativeRefBinding(t *testing.T) { uri: "knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", }, { + name: "broker-name-property", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"name": "my-broker"}), + }, + uri: "knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=my-broker", + }, + { + name: "event-type", + endpoint: camelv1.Endpoint{ + Ref: &v1.ObjectReference{ + Kind: "Broker", + Name: "default", + APIVersion: "eventing.knative.dev/v1", + }, + Properties: asEndpointProperties(map[string]string{"type": "org.apache.camel.myevent"}), + }, + uri: "knative:event/org.apache.camel.myevent?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default", + }, + { + name: "channel", endpoint: camelv1.Endpoint{ Ref: &v1.ObjectReference{ Kind: "Channel", @@ -56,6 +231,7 @@ func TestKnativeRefBinding(t *testing.T) { uri: "knative:channel/mychannel?apiVersion=messaging.knative.dev%2Fv1&kind=Channel", }, { + name: "service", endpoint: camelv1.Endpoint{ Ref: &v1.ObjectReference{ Kind: "Service", @@ -68,7 +244,7 @@ func TestKnativeRefBinding(t *testing.T) { } for i, tc := range testcases { - t.Run(fmt.Sprintf("test-%d-%s", i, tc.endpoint.Ref.Kind), func(t *testing.T) { + t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go index b72ae10245..e6623ea1f2 100644 --- a/pkg/util/knative/knative.go +++ b/pkg/util/knative/knative.go @@ -76,47 +76,36 @@ func CreateSubscription(channelReference corev1.ObjectReference, serviceName str } // CreateServiceTrigger create Knative trigger with arbitrary Kubernetes Service as a subscriber - usually used when no Knative Serving is available on the cluster. -func CreateServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string) (*eventing.Trigger, error) { +func CreateServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string, attributes map[string]string) (*eventing.Trigger, error) { subscriberRef := duckv1.KReference{ APIVersion: "v1", Kind: "Service", Name: serviceName, } - return CreateTrigger(brokerReference, subscriberRef, eventType, path) + return CreateTrigger(brokerReference, subscriberRef, eventType, path, attributes) } // CreateKnativeServiceTrigger create Knative trigger with Knative Serving Service as a subscriber - default option when Knative Serving is available on the cluster. -func CreateKnativeServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string) (*eventing.Trigger, error) { +func CreateKnativeServiceTrigger(brokerReference corev1.ObjectReference, serviceName string, eventType string, path string, attributes map[string]string) (*eventing.Trigger, error) { subscriberRef := duckv1.KReference{ APIVersion: serving.SchemeGroupVersion.String(), Kind: "Service", Name: serviceName, } - return CreateTrigger(brokerReference, subscriberRef, eventType, path) + return CreateTrigger(brokerReference, subscriberRef, eventType, path, attributes) } -func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef duckv1.KReference, eventType string, path string) (*eventing.Trigger, error) { - nameSuffix := "" - var attributes map[string]string - if eventType != "" { - nameSuffix = fmt.Sprintf("-%s", util.SanitizeLabel(eventType)) - attributes = map[string]string{ - "type": eventType, - } - } - return &eventing.Trigger{ +func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef duckv1.KReference, eventType string, path string, attributes map[string]string) (*eventing.Trigger, error) { + trigger := eventing.Trigger{ TypeMeta: metav1.TypeMeta{ APIVersion: eventing.SchemeGroupVersion.String(), Kind: "Trigger", }, ObjectMeta: metav1.ObjectMeta{ Namespace: brokerReference.Namespace, - Name: brokerReference.Name + "-" + subscriberRef.Name + nameSuffix, + Name: GetTriggerName(brokerReference.Name, subscriberRef.Name, eventType), }, Spec: eventing.TriggerSpec{ - Filter: &eventing.TriggerFilter{ - Attributes: attributes, - }, Broker: brokerReference.Name, Subscriber: duckv1.Destination{ Ref: &subscriberRef, @@ -125,7 +114,24 @@ func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef duckv1. }, }, }, - }, nil + } + + if len(attributes) > 0 { + trigger.Spec.Filter = &eventing.TriggerFilter{ + Attributes: attributes, + } + } + + return &trigger, nil +} + +func GetTriggerName(brokerName string, subscriberName string, eventType string) string { + nameSuffix := "" + if eventType != "" { + nameSuffix = fmt.Sprintf("-%s", util.SanitizeLabel(eventType)) + } + + return brokerName + "-" + subscriberName + nameSuffix } func CreateSinkBinding(source corev1.ObjectReference, target corev1.ObjectReference) *sources.SinkBinding { @@ -203,7 +209,7 @@ func getSinkURI(ctx context.Context, c client.Client, sink *corev1.ObjectReferen } objIdentifier := fmt.Sprintf("\"%s/%s\" (%s)", u.GetNamespace(), u.GetName(), u.GroupVersionKind()) - // Special case v1/Service to allow it be addressable + // Special case v1/Service allowing it to be addressable if u.GroupVersionKind().Kind == "Service" && u.GroupVersionKind().Group == "" && u.GroupVersionKind().Version == "v1" { return fmt.Sprintf("http://%s.%s.svc/", u.GetName(), u.GetNamespace()), nil } diff --git a/pkg/util/knative/uri.go b/pkg/util/knative/uri.go index 73f4fb2b2b..f128d5e54d 100644 --- a/pkg/util/knative/uri.go +++ b/pkg/util/knative/uri.go @@ -84,7 +84,7 @@ func ExtractObjectReference(uri string) (v1.ObjectReference, error) { }, nil } -// ExtractEventType extract the eventType from a event URI. +// ExtractEventType extract the eventType from an event URI. func ExtractEventType(uri string) string { return matchOrEmpty(uriRegexp, 2, uri) } diff --git a/script/Makefile b/script/Makefile index 42310fcf01..c188a9514a 100644 --- a/script/Makefile +++ b/script/Makefile @@ -340,7 +340,7 @@ test-install-upgrade: do-build go test -timeout 30m -v ./e2e/install/upgrade -tags=integration $(TEST_INSTALL_RUN) $(GOTESTFMT) # -# Knative tests that require the presence of KNative configuration +# Knative tests that require the presence of Knative configuration # test-knative: do-build STAGING_RUNTIME_REPO="$(STAGING_RUNTIME_REPO)"; \