Skip to content

Commit

Permalink
fix: Improve Knative trigger filter
Browse files Browse the repository at this point in the history
- Fixes #5537: Support filter attributes other than event type (e.g. source, subject, extensions)
- Fixes #5529: Allows empty filter to consume the full event stream
- Fixes #5446: Knative Trigger creation is only based on event type attribute
- Fixes #5577: Consistently support "cloudEventsType" property in Pipes source/sink
- Update documentation and improve Knative Kamelet/Pipe user guide
  • Loading branch information
christophd committed Jun 7, 2024
1 parent 9fd6d50 commit c62573e
Show file tree
Hide file tree
Showing 41 changed files with 1,762 additions and 113 deletions.
2 changes: 1 addition & 1 deletion .github/actions/e2e-knative-yaks/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ runs:
uses: ./.github/actions/kamel-install-yaks
with:
image-name: "docker.io/citrusframework/yaks"
version: 0.14.3
version: 0.19.1

- id: report-problematic
name: List Tests Marked As Problematic
Expand Down
2 changes: 1 addition & 1 deletion .github/actions/kamel-cleanup/cleanup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,6 @@ fi
kubectl get crds | grep camel | awk '{print $1}' | xargs kubectl delete crd &> /dev/null

#
# Remove KNative resources
# Remove Knative resources
#
./.github/actions/kamel-cleanup/cleanup-knative.sh
2 changes: 1 addition & 1 deletion .github/actions/kamel-install-yaks/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ description: 'Install YAKS artifacts'
inputs:
version:
description: "The YAKS version"
default: 0.14.3
default: 0.19.1
required: false
image-name:
description: "The YAKS operator image name"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/knative.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
-q "${{ github.event.inputs.log-level }}" \
-t "${{ github.event.inputs.test-filters }}"
- name: KNative Tests
- name: Knative Tests
uses: ./.github/actions/e2e-knative
with:
cluster-config-data: ${{ secrets.E2E_CLUSTER_CONFIG }}
Expand Down
117 changes: 106 additions & 11 deletions docs/modules/ROOT/pages/kamelets/kamelets-user.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,19 @@ doing additional processing (such as tranformations or other enterprise integrat
A common use case is that of **Knative Sources**, for which the Apache Camel developers maintain the https://knative.dev/docs/eventing/samples/apache-camel-source/[Knative CamelSources].
Kamelets represent an **evolution** of the model proposed in CamelSources, but they allow using the same declarative style of binding, via a resource named **Pipe**.

=== Binding to a Knative Destination
=== Binding to Knative

A Pipe allows to declaratively move data from a system described by a Kamelet towards a Knative destination (or other kind of destinations, in the future), or from
a Knative channel/broker to another external system described by a Kamelet.
A Pipe allows to move data from a system described by a Kamelet towards a Knative destination, or from a Knative channel/broker to another external system described by a Kamelet.
This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way.

For example, here's an example of binding:
For example, here is a pipe that connects a Kamelet Telegram source to the Knative broker:

[source,yaml]
----
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-text-source-to-channel
name: telegram-to-knative
spec:
source: # <1>
ref:
Expand All @@ -250,23 +250,118 @@ spec:
botToken: the-token-here
sink: # <2>
ref:
kind: InMemoryChannel
apiVersion: messaging.knative.dev/v1
name: messages
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
----
<1> Reference to the source that provides data
<2> Reference to the sink where data should be sent to

This binding takes the `telegram-text-source` Kamelet, configures it using specific properties ("botToken") and
makes sure that messages produced by the Kamelet are forwarded to the Knative **InMemoryChannel** named "messages".
makes sure that messages produced by the Kamelet are forwarded to the Knative **Broker** named "default".

Note that source and sink are specified as standard **Kubernetes object references** in a declarative way.

Knative eventing uses CloudEvents data format by default.
You may want to set some properties that specify the event attributes such as the event type.

[source,yaml]
----
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-to-knative
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
properties:
type: org.apache.camel.telegram.events # <1>
----
<1> Sets the event type attribute of the CloudEvent produced by this Pipe

Note that source and sink are specified declaratively as standard **Kubernetes object references**.
This way you may specify event attributes before publishing to the Knative broker.
Note that Camel uses a default CloudEvents event type `org.apache.camel.event` for events produced by Camel.

The example shows how we can reference the "telegram-text-source" resource in a Pipe. It's contained in the `source` section
because it's a Kamelet of type "source".
A Kamelet of type "sink", by contrast, can only be used in the `sink` section of a `Pipe`.

**Under the covers, a Pipe creates an Integration** resource that implements the binding, but this is transparent to the end user.
**Under the covers, a Pipe creates an Integration** resource that implements the binding, but all details of how to connect with
Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a `SinkBinding` concept
under the covers in order to retrieve the Knative broker endpoint URL.

In the same way you can also connect a Kamelet source to a Knative channel.

[source,yaml]
----
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-to-knative-channel
spec:
source: # <1>
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink: # <2>
ref:
kind: InMemoryChannel
apiVersion: messaging.knative.dev/v1
name: messages
----
<1> Reference to the source that provides data
<2> Reference to the Knative channel that acts as the sink where data should be sent to

When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe.
Events consumed from Knative event stream will be pushed to the given sink of the Pipe.

[source,yaml]
----
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: knative-to-slack
spec:
source: # <1>
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
properties:
type: org.apache.camel.event.messages
sink: # <2>
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: slack-sink
properties:
channel: "#my-channel"
webhookUrl: the-webhook-url
----
<1> Reference to the Knative broker source that provides data
<2> Reference to the sink where data should be sent to

Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing.
In the example, all events of type `org.apache.camel.event.messages` get forwarded to the given Slack channel using the Webhook API.

When consuming events from the Knative broker you most likely need to filter and select the events to process.
You can do that with the properties set on the Knative broker source reference.
The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions.

In the background Camel K will automatically create a Knative Trigger for the Pipe that uses the filter attributes accordingly.

=== Binding to a Kafka Topic

Expand Down
4 changes: 2 additions & 2 deletions docs/modules/ROOT/pages/running/camel-runtimes.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ NOTE: this is a best effort analysis taking as reference the work available in v
|v
|x

|KNative Service
|Knative Service
|x
|x
|x
|v

|KNative
|Knative
|x
|v
|x
Expand Down
18 changes: 18 additions & 0 deletions docs/modules/ROOT/partials/apis/camel-k-crds.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7737,6 +7737,24 @@ Enables the camel-k-operator to set the "bindings.knative.dev/include=true" labe
As Knative requires this label to perform injection of K_SINK URL into the service.
If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true)
|`filters` +
[]string
|
Sets filter attributes on the event stream (such as event type, source, subject and so on).
A list of key-value pairs that represent filter attributes and its values.
The syntax is KEY=VALUE, e.g., `source="my.source"`.
Filter attributes get set on the Knative trigger that is being created as part of this integration.
|`filterEventType` +
bool
|
Enables the default filtering for the Knative trigger using the event type
If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true)
|===
Expand Down
12 changes: 12 additions & 0 deletions docs/modules/traits/pages/knative.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ It's enabled by default when the integration targets a single sink
As Knative requires this label to perform injection of K_SINK URL into the service.
If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true)

| knative.filters
| []string
| Sets filter attributes on the event stream (such as event type, source, subject and so on).
A list of key-value pairs that represent filter attributes and its values.
The syntax is KEY=VALUE, e.g., `source="my.source"`.
Filter attributes get set on the Knative trigger that is being created as part of this integration.

| knative.filter-event-type
| bool
| Enables the default filtering for the Knative trigger using the event type
If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true)

|===

// End of autogenerated code - DO NOT EDIT! (configuration)
6 changes: 3 additions & 3 deletions e2e/knative/kamelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ func TestKameletChange(t *testing.T) {
timerSource := "my-timer-source"
g.Expect(CreateTimerKamelet(t, ctx, operatorID, ns, timerSource)()).To(Succeed())
g.Expect(CreateKnativeChannel(t, ctx, ns, knChannel)()).To(Succeed())
// Consumer route that will read from the KNative channel
// Consumer route that will read from the Knative channel
g.Expect(KamelRunWithID(t, ctx, operatorID, ns, "files/test-kamelet-display.groovy", "-w").Execute()).To(Succeed())
g.Eventually(IntegrationPodPhase(t, ctx, ns, "test-kamelet-display")).Should(Equal(corev1.PodRunning))

// Create the Pipe
g.Expect(KamelBindWithID(t, ctx, operatorID, ns, timerSource, knChannelConf, "-p", "source.message=HelloKNative!", "--annotation", "trait.camel.apache.org/health.enabled=true", "--annotation", "trait.camel.apache.org/health.readiness-initial-delay=10", "--name", timerPipe).Execute()).To(Succeed())
g.Expect(KamelBindWithID(t, ctx, operatorID, ns, timerSource, knChannelConf, "-p", "source.message=HelloKnative!", "--annotation", "trait.camel.apache.org/health.enabled=true", "--annotation", "trait.camel.apache.org/health.readiness-initial-delay=10", "--name", timerPipe).Execute()).To(Succeed())
g.Eventually(IntegrationPodPhase(t, ctx, ns, timerPipe)).Should(Equal(corev1.PodRunning))
g.Eventually(IntegrationConditionStatus(t, ctx, ns, timerPipe, v1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(corev1.ConditionTrue))
// Consume the message
g.Eventually(IntegrationLogs(t, ctx, ns, "test-kamelet-display"), TestTimeoutShort).Should(ContainSubstring("HelloKNative!"))
g.Eventually(IntegrationLogs(t, ctx, ns, "test-kamelet-display"), TestTimeoutShort).Should(ContainSubstring("HelloKnative!"))

g.Eventually(PipeCondition(t, ctx, ns, timerPipe, v1.PipeConditionReady), TestTimeoutMedium).Should(And(
WithTransform(PipeConditionStatusExtract, Equal(corev1.ConditionTrue)),
Expand Down
38 changes: 38 additions & 0 deletions e2e/support/util/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ import (

routev1 "github.com/openshift/api/route/v1"
olm "github.com/operator-framework/api/pkg/operators/v1alpha1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"

"github.com/apache/camel-k/v2/pkg/client"
"github.com/apache/camel-k/v2/pkg/client/camel/clientset/versioned"
"github.com/apache/camel-k/v2/pkg/util/knative"
"github.com/apache/camel-k/v2/pkg/util/kubernetes"
"github.com/apache/camel-k/v2/pkg/util/openshift"
)
Expand Down Expand Up @@ -202,6 +205,41 @@ func Dump(ctx context.Context, c client.Client, ns string, t *testing.T) error {
t.Logf("---\n%s\n---\n", string(pdata))
}

// Knative resources
if installed, _ := knative.IsEventingInstalled(c); installed {
var trgs eventingv1.TriggerList
err = c.List(ctx, &trgs)
if err != nil {
return err
}
t.Logf("Found %d Knative trigger:\n", len(trgs.Items))
for _, p := range trgs.Items {
ref := p
pdata, err := kubernetes.ToYAMLNoManagedFields(&ref)
if err != nil {
return err
}
t.Logf("---\n%s\n---\n", string(pdata))
}
}

if installed, _ := knative.IsServingInstalled(c); installed {
var ksrvs servingv1.ServiceList
err = c.List(ctx, &ksrvs)
if err != nil {
return err
}
t.Logf("Found %d Knative services:\n", len(ksrvs.Items))
for _, p := range ksrvs.Items {
ref := p
pdata, err := kubernetes.ToYAMLNoManagedFields(&ref)
if err != nil {
return err
}
t.Logf("---\n%s\n---\n", string(pdata))
}
}

// CamelCatalogs
cats, err := camelClient.CamelV1().CamelCatalogs(ns).List(ctx, metav1.ListOptions{})
if err != nil {
Expand Down
45 changes: 45 additions & 0 deletions e2e/yaks/common/knative-broker/event-source-pipe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: event-source-pipe
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: timer-source
properties:
period: 1000
message: "Hello this is event-1!"
steps:
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: log-action
properties:
showHeaders: true
sink:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
properties:
type: org.apache.camel.event.messages
ce.override.ce-source: org.apache.camel
Loading

0 comments on commit c62573e

Please sign in to comment.