-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka EventBus #10
Kafka EventBus #10
Conversation
0092baf
to
b85a505
Compare
6a7efea
to
b5d9c15
Compare
@@ -35,12 +37,12 @@ func GetEventSourceDriver(ctx context.Context, eventBusConfig eventbusv1alpha1.B | |||
|
|||
var eventBusType apicommon.EventBusType | |||
switch { | |||
case eventBusConfig.NATS != nil && eventBusConfig.JetStream != nil: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a good check to keep in to make sure multiple specs aren't defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But permutations increase exponentially, so it quickly gets out of control
1. eventBusConfig.NATS != nil && eventBusConfig.JetStream != nil
2. eventBusConfig.NATS != nil && eventBusConfig.KafkaStream != nil
3. eventBusConfig.JetStream != nil && eventBusConfig.KafkaStream != nil
4. eventBusConfig.NATS != nil && eventBusConfig.JetStream != nil && eventBusConfig.KafkaStream != nil
Yuck!
@@ -79,12 +83,12 @@ func GetSensorDriver(ctx context.Context, eventBusConfig eventbusv1alpha1.BusCon | |||
|
|||
var eventBusType apicommon.EventBusType | |||
switch { | |||
case eventBusConfig.NATS != nil && eventBusConfig.JetStream != nil: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
// consumer config | ||
config.Consumer.IsolationLevel = sarama.ReadCommitted | ||
config.Consumer.Offsets.AutoCommit.Enable = false | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this Config method also set the ConsumerGroup.GroupName
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GroupName is specified on the consumer, not the config. You can see this here
@@ -40,7 +40,7 @@ type LeaderCallbacks struct { | |||
|
|||
func NewElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int, namespace string, leasename string, hostname string) (Elector, error) { | |||
switch { | |||
case strings.ToLower(os.Getenv(common.EnvVarLeaderElection)) == "k8s": | |||
case eventBusConfig.Kafka != nil || strings.ToLower(os.Getenv(common.EnvVarLeaderElection)) == "k8s": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In sensors/listener.go you mention kafka doesn't require leader election? If sources don't do leader election also, maybe return an error mentioning that in this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eventsources still need a leader election (under some circumstances, it's a little confusing). So unfortunately we still need this for the event source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the logic that determines if the eventsource needs a leader election or not. The important thing to note is there are circumstances where an eventsource with a kafka eventbus requires a leader election.
func NewKafkaSensor(kafkaConfig *eventbusv1alpha1.KafkaConfig, sensor *sensorv1alpha1.Sensor, hostname string, logger *zap.SugaredLogger) *KafkaSensor { | ||
topics := &Topics{ | ||
event: kafkaConfig.Topic, | ||
trigger: fmt.Sprintf("%s.%s.%s", kafkaConfig.Topic, sensor.Name, "trigger"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the trigger topics, same comment as before, should it also include the namespace to include the possibility that there are two sensors in different namespaces with the same name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes definitely! The namespace is already included in the topic name itself here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think we should think through the option of the user specifying the topic names manually - do you think users may want to provide the trigger and action topic name manually too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I feel it's sufficient to (optionally) have the user specify the eventbus {event} topic and then derive the {trigger, action} topic names from the name of this topic (as I do currently)
} | ||
|
||
if _, ok := s.triggers[triggerName]; !ok { | ||
expr, err := govaluate.NewEvaluableExpression(strings.ReplaceAll(depExpression, "-", "\\-")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is line 165 NewEvaluableExpression(strings.ReplaceAll
for ? Does the argo events code do this in other places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This creates a new evaluatable expression from the string that's provided in the trigger's condition field. It's the same library as used today in jetstream here.
defer s.Disconnect() | ||
|
||
for { | ||
if len(s.triggers) != len(s.sensor.Spec.Triggers) || !s.triggers.Ready() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that the triggers would never be ready and this would loop forever. Would it be helpful to add a max wait duration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't hurt, I can add one!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I implemented this and then I realized, even if we waited a max duration and return, the function would just be re-invoked in sensors/listener.go
anyways. Furthermore, I realized that Connect and Subscribe are guaranteed to be called for each trigger (and if this happens then the if condition will be satisfied) so I don't think this is actually necessary.
return messages, offset, nil | ||
} | ||
|
||
func (s *KafkaSensor) Action(msg *sarama.ConsumerMessage) ([]*sarama.ProducerMessage, int64, func()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like for event/action/trigger the second return value is always msg.offset + 1. Maybe remove this as a return value if it's always the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not always the same, the trigger is the odd one out, it is set to the smallest required offset across all triggers. That value is set here
And then just one genreal question for the above triggering logic. Suppose you have the condition (a&&b) || (b&&c) and event b has already come in so the expression looks like (a && true) || (true && c). What will happen if in the next batch, messages a and c come. Will there be two triggers? |
The ordering of messages in a batch is guaranteed (thanks kafka) so the So stepping through this (the number preceding the event represents the offset)
|
1f24be9
to
4ccfe01
Compare
16d142b
to
f53db5b
Compare
f53db5b
to
48007fd
Compare
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: Prema devi Kuppuswamy <premadk@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com>
* in-cluster kafka Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * fix amd64 * add kafka clusters through kustomization Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * revert arm64 change Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * addressing comments Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * Consolidate manifests Signed-off-by: David Farr <david_farr@intuit.com> * Fix indentation Signed-off-by: David Farr <david_farr@intuit.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com> Co-authored-by: David Farr <david_farr@intuit.com> Signed-off-by: David Farr <david_farr@intuit.com>
* webhook validations Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * add Exotic Kafka installer * Change order of EventBus reconcilation Calling client.Status().Update() before calling client.Update() persists the status for exotic evnetbuses, otherwise the status is not perisisted. Signed-off-by: David Farr <david_farr@intuit.com> * requeue reconciler Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> * Make additional copy of obj to update eb status Signed-off-by: David Farr <david_farr@intuit.com> * oxford comma Co-authored-by: David Farr <david_farr@intuit.com> * stylistic change Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com> Co-authored-by: David Farr <david_farr@intuit.com> Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Initial implementation of kafka eventbus sensor --------- Signed-off-by: David Farr <david_farr@intuit.com> Signed-off-by: Bilal Bakht Ahmad <tringingly@gmail.com> Co-authored-by: Bilal Bakht Ahmad <tringingly@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com>
TODO: consolidate sarama config initialization into common code shared between eventbus eventsource and sensor Signed-off-by: David Farr <david_farr@intuit.com>
Enables the ability to ignore messages after pod failure to ensure correct resiliency. Signed-off-by: David Farr <david_farr@intuit.com>
A kafka transaction is expensive, to improve throughput we can batch together incoming messages and include all the resulting output information in a single transaction. Signed-off-by: David Farr <david_farr@intuit.com>
This service account has authorization to use leases, which is required for kubernetes based leader election. Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: Prema devi Kuppuswamy <premadk@gmail.com> Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Creating a topic with a dot gives the following warning WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Avoiding this altogether by switching to dashes. Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
Test against all eventbus types (nats, jetstream, kafka) and ensure that kafka secrets are attached. Signed-off-by: David Farr <david_farr@intuit.com>
Signed-off-by: David Farr <david_farr@intuit.com>
1cdc903
to
def91b2
Compare
Signed-off-by: David Farr <david_farr@intuit.com>
Implement Kafka EventBus