-
Notifications
You must be signed in to change notification settings - Fork 746
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Kafka EventBus #2502
feat: Kafka EventBus #2502
Conversation
16d142b
to
f53db5b
Compare
envs = append(envs, envVars...) | ||
deploymentSpec.Template.Spec.Containers[0].Env = envs | ||
// secrets | ||
volSecrets, volSecretMounts := common.VolumesFromSecretsOrConfigMaps(common.SecretKeySelectorType, secretObjs...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the eventBus type is kafka, we need can use the common.VolumesFromSecretsOrConfigMaps
function to attach the tls and sasl secrets. They need to be attached at the same time as the secrets in the sensor/eventsource so that any secrets with the same name will be deduplicated.
@@ -3,6 +3,8 @@ kind: EventSource | |||
metadata: | |||
name: e2e-durable-consumer | |||
spec: | |||
template: | |||
serviceAccountName: argo-events-sa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the argo-events-sa
service account to tests as the kafka eventsource requires a kubernetes lease and the default service account does not have permission to create these objects in the ci tests
@@ -123,6 +123,7 @@ jobs: | |||
include: | |||
- driver: stan | |||
- driver: jetstream | |||
- driver: kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This executes all e2e tests against kafka as well
@@ -42,5 +42,9 @@ type EventSourceDriver interface { | |||
|
|||
type SensorDriver interface { | |||
Initialize() error | |||
Connect(triggerName string, dependencyExpression string, deps []Dependency) (TriggerConnection, error) | |||
Connect(ctx context.Context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added context and atLeastOnce boolean to this interface, the stan and jetstream implementation just ignore these values
@@ -40,7 +40,7 @@ type LeaderCallbacks struct { | |||
|
|||
func NewElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int, namespace string, leasename string, hostname string) (Elector, error) { | |||
switch { | |||
case strings.ToLower(os.Getenv(common.EnvVarLeaderElection)) == "k8s": | |||
case eventBusConfig.Kafka != nil || strings.ToLower(os.Getenv(common.EnvVarLeaderElection)) == "k8s": | |||
return newKubernetesElector(namespace, leasename, hostname) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if Kafka is defined we still do leader election here? (even though it's "master/master")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need the leader election for the eventsource. For example if an eventsource uses the calendar we need active/passive so that both pods aren't emitting calendar events at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. And I see it's not being called on the Sensor side.
f53db5b
to
48007fd
Compare
I assume you'll be adding docs for Kafka after this PR, right? |
We created a small test framework to verify correctness of the kafka eventbus under failure. The framework has a notion of chaos that randomly deletes one of the sensor pods on a schedule (every 30s, 1m, etc). In addition to the chaos we ran a few iterations of the tests by tweaking the following dimensions:
Under all scenarios tested we were able to verify correctness. Please note that this does not mean the trigger is always invoked, when "at least once" is specified we can verify that trigger invocations conform to this semantic and same for "at most once". Attached are the results. |
s.Logger.Errorw("Kafka error", zap.Error(err)) | ||
return | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's interesting that all the logic is in the Sensor and not in the KafkaTriggerConnection - did it fit better to put it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(the Trigger-processing part that is...I know the Events are generic and all)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is top down, which is in contrast to how it works for jetstream. When an event is consumed from a kafka topic, the 3 handler functions (Event, Trigger, Action) are invoked depending on which topic the event was consumed from. These functions then use the Triggers map (part of the KafkaSensor struct) to send these messages to each trigger that requires it.
To maintain seperation, I try to keep all state in the TriggerConnection struct (the events) and the Sensor only interacts with the trigger connections by invoking the interface functions (which is a superset of the TriggerConnection interface).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I've looked at it further since my comment. It's all very elegant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you :)
Yes! Actually let me add at least the minimum docs to this PR and if we need more in depth details I'm happy to follow up with another PR. |
for _, trigger := range s.triggers { | ||
offset = trigger.Offset(msg.Partition, offset) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this handler be called by two threads at the same time or no? if so, any issues related to both threads executing lines 347-349 at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, luckily this is not possible by construction. We use the kafka message key to look up the trigger here, a key always maps to only one partition and all messages pertaining to the same partition are processed sequentially (and by one pod). That means we don't have to worry about concurrency in our stateful code (mostly the KafkaTriggerConnection). This is also how we enable scaling - all events that pertain to a specific trigger will land on the same partition and are guaranteed to be processed by the same pod.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I guess my confusion was that we were looking at other triggers outside of our own on line 347
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I see what you're saying. The Offset() function only goes through events on this same partition, and all messages pertaining to the same partition are processed sequentially.
I'm fine if you want to get this in first before the docs - up to you (and Derek) |
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: Prema devi Kuppuswamy <premadk@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com>
* in-cluster kafka Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * fix amd64 * add kafka clusters through kustomization Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * revert arm64 change Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * addressing comments Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * Consolidate manifests Signed-off-by: David Farr <david_farr@intuit.com> * Fix indentation Signed-off-by: David Farr <david_farr@intuit.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com> Co-authored-by: David Farr <david_farr@intuit.com> Signed-off-by: David Farr <david_farr@intuit.com>
* webhook validations Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * add Exotic Kafka installer * Change order of EventBus reconcilation Calling client.Status().Update() before calling client.Update() persists the status for exotic evnetbuses, otherwise the status is not perisisted. Signed-off-by: David Farr <david_farr@intuit.com> * requeue reconciler Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * Make additional copy of obj to update eb status Signed-off-by: David Farr <david_farr@intuit.com> * oxford comma Co-authored-by: David Farr <david_farr@intuit.com> * stylistic change Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com> Co-authored-by: David Farr <david_farr@intuit.com> Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Initial implementation of kafka eventbus sensor --------- Signed-off-by: David Farr <david_farr@intuit.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Co-authored-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com>
TODO: consolidate sarama config initialization into common code shared between eventbus eventsource and sensor Signed-off-by: David Farr <david_farr@intuit.com>
Enables the ability to ignore messages after pod failure to ensure correct resiliency. Signed-off-by: David Farr <david_farr@intuit.com>
A kafka transaction is expensive, to improve throughput we can batch together incoming messages and include all the resulting output information in a single transaction. Signed-off-by: David Farr <david_farr@intuit.com>
This service account has authorization to use leases, which is required for kubernetes based leader election. Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Creating a topic with a dot gives the following warning WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Avoiding this altogether by switching to dashes. Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Test against all eventbus types (nats, jetstream, kafka) and ensure that kafka secrets are attached. Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
1cdc903
to
def91b2
Compare
Signed-off-by: David Farr <david_farr@intuit.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance of supporting IAM MSK auth on Kafka for this ? #1345 has background on eventsource side |
I think this is something we can revisit now that we've released Kafka Eventbus. Ideally, it would be properly supported in sarama, browsing through that issue it looks like the workarounds that have been implemented have been flaky. |
Signed-off-by: David Farr <david_farr@intuit.com> Signed-off-by: Prema devi Kuppuswamy <premadk@gmail.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Co-authored-by: Prema <107519450+premadk@users.noreply.github.com> Co-authored-by: Bilal Bakht Ahmad <bilalba@users.noreply.github.com> Co-authored-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: JohnWillker <frazao.jhonn@gmail.com>
Signed-off-by: David Farr <david_farr@intuit.com> Signed-off-by: Prema devi Kuppuswamy <premadk@gmail.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Co-authored-by: Prema <107519450+premadk@users.noreply.github.com> Co-authored-by: Bilal Bakht Ahmad <bilalba@users.noreply.github.com> Co-authored-by: Bilal Bakht Ahmad <tringingly@gmail.com>
Signed-off-by: David Farr <david_farr@intuit.com> Signed-off-by: Prema devi Kuppuswamy <premadk@gmail.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Co-authored-by: Prema <107519450+premadk@users.noreply.github.com> Co-authored-by: Bilal Bakht Ahmad <bilalba@users.noreply.github.com> Co-authored-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: Aalok <aalok_ahluwalia@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com> Signed-off-by: Prema devi Kuppuswamy <premadk@gmail.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Co-authored-by: Prema <107519450+premadk@users.noreply.github.com> Co-authored-by: Bilal Bakht Ahmad <bilalba@users.noreply.github.com> Co-authored-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: Aalok <aalok_ahluwalia@intuit.com>
Kafka EventBus
An implementation of EventBus leveraging Kafka. Supports simple conditions, complex conditions (ones containing 'and' clauses), trigger condition resets, and both at-most-once and at-least-once trigger semantics. In contrast to existing EventBus implementations, Sensors are horizontally scalable (EventSources are sometimes horizontally scalable).
The implementation uses three topics, an event topic that EventSources produce to and Sensors consume from. A trigger topic that is used to maintain state and ensure resiliency for triggers with complex conditions. And finally an action topic used to decouple event processing from trigger actions. The event topic is shared between all EventSources and Sensors wired up to the same EventBus, whereas the trigger and action topics are specific to a single Sensor.
Default naming convention
{namespace}-{eventbus}
{namespace}-{eventbus}-{sensor}-trigger
{namespace}-{eventbus}-{sensor}-action
High level architecture
For simple trigger conditions (such as
t2
) we can skip the trigger topic and publish messages directly to the action topic. For complex trigger conditions (such ast1
) we use the trigger topic to hold on to events until the trigger condition is satisfied.Please see my blog post for in-depth implementation details. The post poses a dilemma with the Kafka implementation that occurs under the following scenario. Imagine a Sensor with the following two triggers.
Assume the events below are received in the following order. Furthermore, assume all topics {event, trigger, action} contain only a single partition.
Trigger
t2
will be invoked once, however, because all messages land on the same partition (there is only one) the dilemma is that we cannot bump our consumer’s offset as we need to hold on to thea
event in order to maintain resiliency. But, imagine a restart occurs and we re-consume all three events starting with eventa
. How can we ensure that we do not re-invoket2
? To solve this problem we opted to maintain metadata alongside the offset.The metadata maintains a mapping of triggers to offsets, for example:
If ever a restart occurs and we start consuming from offset 0, however, any message pertaining to trigger
t2
are skipped until we get to offset 3 and therefore no errant trigger action is invoked.Checklist: