Skip to content

Commit

Permalink
fix: Improve Knative trigger filter
Browse files Browse the repository at this point in the history
- Fixes #5537: Support filter attributes other than event type (e.g. source, subject, extensions)
- Fixes #5529: Allows empty filter to consume the full event stream
- Fixes #5446: Knative Trigger creation is only based on event type attribute
- Fixes #5577: Consistently support "cloudEventsType" property in Pipes source/sink
- Update documentation and improve Knative Kamelet/Pipe user guide
  • Loading branch information
christophd committed Jun 6, 2024
1 parent 9fd6d50 commit d98000c
Show file tree
Hide file tree
Showing 23 changed files with 892 additions and 78 deletions.
117 changes: 106 additions & 11 deletions docs/modules/ROOT/pages/kamelets/kamelets-user.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,19 +227,19 @@ doing additional processing (such as tranformations or other enterprise integrat
A common use case is that of **Knative Sources**, for which the Apache Camel developers maintain the https://knative.dev/docs/eventing/samples/apache-camel-source/[Knative CamelSources].
Kamelets represent an **evolution** of the model proposed in CamelSources, but they allow using the same declarative style of binding, via a resource named **Pipe**.

=== Binding to a Knative Destination
=== Binding to Knative

A Pipe allows to declaratively move data from a system described by a Kamelet towards a Knative destination (or other kind of destinations, in the future), or from
a Knative channel/broker to another external system described by a Kamelet.
A Pipe allows to move data from a system described by a Kamelet towards a Knative destination, or from a Knative channel/broker to another external system described by a Kamelet.
This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way.

For example, here's an example of binding:
For example, here is a pipe that connects a Kamelet Telegram source to the Knative broker:

[source,yaml]
----
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-text-source-to-channel
name: telegram-to-knative
spec:
source: # <1>
ref:
Expand All @@ -250,23 +250,118 @@ spec:
botToken: the-token-here
sink: # <2>
ref:
kind: InMemoryChannel
apiVersion: messaging.knative.dev/v1
name: messages
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
----
<1> Reference to the source that provides data
<2> Reference to the sink where data should be sent to

This binding takes the `telegram-text-source` Kamelet, configures it using specific properties ("botToken") and
makes sure that messages produced by the Kamelet are forwarded to the Knative **InMemoryChannel** named "messages".
makes sure that messages produced by the Kamelet are forwarded to the Knative **Broker** named "default".

Note that source and sink are specified as standard **Kubernetes object references** in a declarative way.

Knative eventing uses CloudEvents data format by default.
You may want to set some properties that specify the event attributes such as the event type.

[source,yaml]
----
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-to-knative
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
properties:
type: org.apache.camel.telegram.events # <1>
----
<1> Sets the event type attribute of the CloudEvent produced by this Pipe

Note that source and sink are specified declaratively as standard **Kubernetes object references**.
This way you may specify event attributes before publishing to the Knative broker.
Note that Camel uses a default CloudEvents event type `org.apache.camel.event` for events produced by Camel.

The example shows how we can reference the "telegram-text-source" resource in a Pipe. It's contained in the `source` section
because it's a Kamelet of type "source".
A Kamelet of type "sink", by contrast, can only be used in the `sink` section of a `Pipe`.

**Under the covers, a Pipe creates an Integration** resource that implements the binding, but this is transparent to the end user.
**Under the covers, a Pipe creates an Integration** resource that implements the binding, but all details of how to connect with
Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a `SinkBinding` concept
under the covers in order to retrieve the Knative broker endpoint URL.

In the same way you can also connect a Kamelet source to a Knative channel.

[source,yaml]
----
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: telegram-to-knative-channel
spec:
source: # <1>
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: telegram-text-source
properties:
botToken: the-token-here
sink: # <2>
ref:
kind: InMemoryChannel
apiVersion: messaging.knative.dev/v1
name: messages
----
<1> Reference to the source that provides data
<2> Reference to the Knative channel that acts as the sink where data should be sent to

When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe.
Events consumed from Knative event stream will be pushed to the given sink of the Pipe.

[source,yaml]
----
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: knative-to-slack
spec:
source: # <1>
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
properties:
type: org.apache.camel.event.messages
sink: # <2>
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: slack-sink
properties:
channel: "#my-channel"
webhookUrl: the-webhook-url
----
<1> Reference to the Knative broker source that provides data
<2> Reference to the sink where data should be sent to

Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing.
In the example, all events of type `org.apache.camel.event.messages` get forwarded to the given Slack channel using the Webhook API.

When consuming events from the Knative broker you most likely need to filter and select the events to process.
You can do that with the properties set on the Knative broker source reference.
The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions.

In the background Camel K will automatically create a Knative Trigger for the Pipe that uses the filter attributes accordingly.

=== Binding to a Kafka Topic

Expand Down
18 changes: 18 additions & 0 deletions docs/modules/ROOT/partials/apis/camel-k-crds.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7737,6 +7737,24 @@ Enables the camel-k-operator to set the "bindings.knative.dev/include=true" labe
As Knative requires this label to perform injection of K_SINK URL into the service.
If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true)
|`filters` +
[]string
|
Sets filter attributes on the event stream (such as event type, source, subject and so on).
A list of key-value pairs that represent filter attributes and its values.
The syntax is KEY=VALUE, e.g., `source="my.source"`.
Filter attributes get set on the Knative trigger that is being created as part of this integration.
|`filterEventType` +
bool
|
Enables the default filtering for the Knative trigger using the event type
If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true)
|===
Expand Down
12 changes: 12 additions & 0 deletions docs/modules/traits/pages/knative.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ It's enabled by default when the integration targets a single sink
As Knative requires this label to perform injection of K_SINK URL into the service.
If this is false, the integration pod may start and fail, read the SinkBinding Knative documentation. (default: true)

| knative.filters
| []string
| Sets filter attributes on the event stream (such as event type, source, subject and so on).
A list of key-value pairs that represent filter attributes and its values.
The syntax is KEY=VALUE, e.g., `source="my.source"`.
Filter attributes get set on the Knative trigger that is being created as part of this integration.

| knative.filter-event-type
| bool
| Enables the default filtering for the Knative trigger using the event type
If this is true, the created Knative trigger uses the event type as a filter on the event stream when no other filter criteria is given. (default: true)

|===

// End of autogenerated code - DO NOT EDIT! (configuration)
34 changes: 34 additions & 0 deletions helm/camel-k/crds/crd-integration-platform.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1395,11 +1395,28 @@ spec:
items:
type: string
type: array
filterEventType:
description: 'Enables the default filtering for the Knative
trigger using the event type If this is true, the created
Knative trigger uses the event type as a filter on the event
stream when no other filter criteria is given. (default:
true)'
type: boolean
filterSourceChannels:
description: Enables filtering on events based on the header
"ce-knativehistory". Since this header has been removed
in newer versions of Knative, filtering is disabled by default.
type: boolean
filters:
description: Sets filter attributes on the event stream (such
as event type, source, subject and so on). A list of key-value
pairs that represent filter attributes and its values. The
syntax is KEY=VALUE, e.g., `source="my.source"`. Filter
attributes get set on the Knative trigger that is being
created as part of this integration.
items:
type: string
type: array
namespaceLabel:
description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true"
label to the namespace As Knative requires this label to
Expand Down Expand Up @@ -3392,11 +3409,28 @@ spec:
items:
type: string
type: array
filterEventType:
description: 'Enables the default filtering for the Knative
trigger using the event type If this is true, the created
Knative trigger uses the event type as a filter on the event
stream when no other filter criteria is given. (default:
true)'
type: boolean
filterSourceChannels:
description: Enables filtering on events based on the header
"ce-knativehistory". Since this header has been removed
in newer versions of Knative, filtering is disabled by default.
type: boolean
filters:
description: Sets filter attributes on the event stream (such
as event type, source, subject and so on). A list of key-value
pairs that represent filter attributes and its values. The
syntax is KEY=VALUE, e.g., `source="my.source"`. Filter
attributes get set on the Knative trigger that is being
created as part of this integration.
items:
type: string
type: array
namespaceLabel:
description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true"
label to the namespace As Knative requires this label to
Expand Down
34 changes: 34 additions & 0 deletions helm/camel-k/crds/crd-integration-profile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1272,11 +1272,28 @@ spec:
items:
type: string
type: array
filterEventType:
description: 'Enables the default filtering for the Knative
trigger using the event type If this is true, the created
Knative trigger uses the event type as a filter on the event
stream when no other filter criteria is given. (default:
true)'
type: boolean
filterSourceChannels:
description: Enables filtering on events based on the header
"ce-knativehistory". Since this header has been removed
in newer versions of Knative, filtering is disabled by default.
type: boolean
filters:
description: Sets filter attributes on the event stream (such
as event type, source, subject and so on). A list of key-value
pairs that represent filter attributes and its values. The
syntax is KEY=VALUE, e.g., `source="my.source"`. Filter
attributes get set on the Knative trigger that is being
created as part of this integration.
items:
type: string
type: array
namespaceLabel:
description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true"
label to the namespace As Knative requires this label to
Expand Down Expand Up @@ -3152,11 +3169,28 @@ spec:
items:
type: string
type: array
filterEventType:
description: 'Enables the default filtering for the Knative
trigger using the event type If this is true, the created
Knative trigger uses the event type as a filter on the event
stream when no other filter criteria is given. (default:
true)'
type: boolean
filterSourceChannels:
description: Enables filtering on events based on the header
"ce-knativehistory". Since this header has been removed
in newer versions of Knative, filtering is disabled by default.
type: boolean
filters:
description: Sets filter attributes on the event stream (such
as event type, source, subject and so on). A list of key-value
pairs that represent filter attributes and its values. The
syntax is KEY=VALUE, e.g., `source="my.source"`. Filter
attributes get set on the Knative trigger that is being
created as part of this integration.
items:
type: string
type: array
namespaceLabel:
description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true"
label to the namespace As Knative requires this label to
Expand Down
17 changes: 17 additions & 0 deletions helm/camel-k/crds/crd-integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7336,11 +7336,28 @@ spec:
items:
type: string
type: array
filterEventType:
description: 'Enables the default filtering for the Knative
trigger using the event type If this is true, the created
Knative trigger uses the event type as a filter on the event
stream when no other filter criteria is given. (default:
true)'
type: boolean
filterSourceChannels:
description: Enables filtering on events based on the header
"ce-knativehistory". Since this header has been removed
in newer versions of Knative, filtering is disabled by default.
type: boolean
filters:
description: Sets filter attributes on the event stream (such
as event type, source, subject and so on). A list of key-value
pairs that represent filter attributes and its values. The
syntax is KEY=VALUE, e.g., `source="my.source"`. Filter
attributes get set on the Knative trigger that is being
created as part of this integration.
items:
type: string
type: array
namespaceLabel:
description: 'Enables the camel-k-operator to set the "bindings.knative.dev/include=true"
label to the namespace As Knative requires this label to
Expand Down
17 changes: 17 additions & 0 deletions helm/camel-k/crds/crd-kamelet-binding.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7624,12 +7624,29 @@ spec:
items:
type: string
type: array
filterEventType:
description: 'Enables the default filtering for the Knative
trigger using the event type If this is true, the created
Knative trigger uses the event type as a filter on the
event stream when no other filter criteria is given.
(default: true)'
type: boolean
filterSourceChannels:
description: Enables filtering on events based on the
header "ce-knativehistory". Since this header has been
removed in newer versions of Knative, filtering is disabled
by default.
type: boolean
filters:
description: Sets filter attributes on the event stream
(such as event type, source, subject and so on). A list
of key-value pairs that represent filter attributes
and its values. The syntax is KEY=VALUE, e.g., `source="my.source"`.
Filter attributes get set on the Knative trigger that
is being created as part of this integration.
items:
type: string
type: array
namespaceLabel:
description: 'Enables the camel-k-operator to set the
"bindings.knative.dev/include=true" label to the namespace
Expand Down
Loading

0 comments on commit d98000c

Please sign in to comment.