-
Notifications
You must be signed in to change notification settings - Fork 82
Set subscription readiness based on consumer group status #182
Set subscription readiness based on consumer group status #182
Conversation
Codecov Report
@@ Coverage Diff @@
## master #182 +/- ##
==========================================
- Coverage 75.99% 75.21% -0.78%
==========================================
Files 112 114 +2
Lines 4274 4459 +185
==========================================
+ Hits 3248 3354 +106
- Misses 831 901 +70
- Partials 195 204 +9
Continue to review full report at Codecov.
|
/lgtm |
/lgtm |
pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go
Outdated
Show resolved
Hide resolved
pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go
Outdated
Show resolved
Hide resolved
pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go
Outdated
Show resolved
Hide resolved
pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go
Outdated
Show resolved
Hide resolved
pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go
Outdated
Show resolved
Hide resolved
pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go
Show resolved
Hide resolved
pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go
Outdated
Show resolved
Hide resolved
pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go
Outdated
Show resolved
Hide resolved
Very nice! I want this to become a library usable by the Broker impl as well! (not in this PR) |
@@ -35,8 +35,5 @@ func main() { | |||
ctx = injection.WithNamespaceScope(ctx, ns) | |||
} | |||
|
|||
// Do not run the dispatcher in leader-election mode | |||
ctx = sharedmain.WithHADisabled(ctx) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you suggesting that we keep leader election disabled since this is just a read only reconciler? are there down sides of enabling it and observing how it behaves in this mode? since now we set up the consumers on all replicas with ObserveKind
and on the leader with ReconcileKind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The end result is the same since we're doing the same thing in both ObserveKind
and ReconcileKind
but we don't really need to have replicas competing to acquire locks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding is that the preference is to have HA enabled, with ReconcileKind
and ObserveKind
, over disabling HA and having replicas with ReconcileKind
since this works as intended. I might be mistaken. In all cases, I'd appreciate if this detail doesn't stop the merging of the PR if it's functionally correct.
I'm working on implementing a mix of ReconcileKind
and ObserveKind
in the dispatcher in a PoC as was suggested here knative/eventing-contrib#1446 (comment) , so I need HA eventually anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't really need to have replicas competing to acquire locks.
That. Allowing HA seems confusing to me as there is no need it.
* Use sets.String for internal cache instead of string slice * Allow for a single callback per watcher * Synchronization on Terminate and Forget
The following is the coverage report on the affected files.
|
@@ -35,8 +35,5 @@ func main() { | |||
ctx = injection.WithNamespaceScope(ctx, ns) | |||
} | |||
|
|||
// Do not run the dispatcher in leader-election mode | |||
ctx = sharedmain.WithHADisabled(ctx) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The end result is the same since we're doing the same thing in both ObserveKind
and ReconcileKind
but we don't really need to have replicas competing to acquire locks.
func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { | ||
// Do not attempt retrying creating the client because it might be a permanent error | ||
// in which case the finalizer will never get removed. | ||
if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.kafkaConfig != nil { | ||
if kafkaClusterAdmin, err := r.createClient(); err == nil && r.kafkaConfig != nil { | ||
if err := r.deleteTopic(ctx, kc, kafkaClusterAdmin); err != nil { | ||
return err | ||
} | ||
} | ||
r.consumerGroupWatcher.Forget(string(kc.ObjectMeta.UID)) | ||
return newReconciledNormal(kc.Namespace, kc.Name) //ok to remove finalizer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't be called on non-leader replicas, so to properly clean up resources we have to implement ReadOnlyFinalizer
or keep HA disabled and have only ReconcileKind
and FinalizeKind
.
I think the latter solution is better (less code, no wasted resource to acquire locks, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pierDipi sorry I got a little confused, this is the controller not the dispatcher. The controller runs in HA with a single leader, and stand-by replicas. We don't need this called on non-leader replicas cause they don't set watchers first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused too :)
But I think the comment is valid (in the wrong place) since the dispatcher doesn't implement Finalizer
at all, and that's a bug that we can address later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened an issue: #245
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: devguyio, pierDipi, slinkydeveloper The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Set subscription readiness based on consumer group status (knative#182) * WIP: Set subscription readiness based on consumer group status * Refactor KafkaWatcher to an edge-triggered, level driven design * Refactor the test to prevent a map race * goimports and gofmt comments * Add tests for the KafkaWatcher and reconnect for AdminClient * Improve consumer group watcher code aethetics and robustness * Make variable names more descriptive for consumer group watcher and adimn client * Forget watcher when channel is deleted * Terminate old watcher and free the cache when config is created * Set the KafkaChannel consolidated dispatcher reconciler to be read-only * Enable HA for KafkaChannel(consolidated) dispatcher * Run update-codegen.sh * Optimize consumer group watcher * Use sets.String for internal cache instead of string slice * Allow for a single callback per watcher * Synchronization on Terminate and Forget
Set subscription readiness based on consumer group status (#182) * WIP: Set subscription readiness based on consumer group status * Refactor KafkaWatcher to an edge-triggered, level driven design * Refactor the test to prevent a map race * goimports and gofmt comments * Add tests for the KafkaWatcher and reconnect for AdminClient * Improve consumer group watcher code aethetics and robustness * Make variable names more descriptive for consumer group watcher and adimn client * Forget watcher when channel is deleted * Terminate old watcher and free the cache when config is created * Set the KafkaChannel consolidated dispatcher reconciler to be read-only * Enable HA for KafkaChannel(consolidated) dispatcher * Run update-codegen.sh * Optimize consumer group watcher * Use sets.String for internal cache instead of string slice * Allow for a single callback per watcher * Synchronization on Terminate and Forget
@devguyio can you backport this to the |
…tensions#182) * WIP: Set subscription readiness based on consumer group status * Refactor KafkaWatcher to an edge-triggered, level driven design * Refactor the test to prevent a map race * goimports and gofmt comments * Add tests for the KafkaWatcher and reconnect for AdminClient * Improve consumer group watcher code aethetics and robustness * Make variable names more descriptive for consumer group watcher and adimn client * Forget watcher when channel is deleted * Terminate old watcher and free the cache when config is created * Set the KafkaChannel consolidated dispatcher reconciler to be read-only * Enable HA for KafkaChannel(consolidated) dispatcher * Run update-codegen.sh * Optimize consumer group watcher * Use sets.String for internal cache instead of string slice * Allow for a single callback per watcher * Synchronization on Terminate and Forget
* WIP: Set subscription readiness based on consumer group status * Refactor KafkaWatcher to an edge-triggered, level driven design * Refactor the test to prevent a map race * goimports and gofmt comments * Add tests for the KafkaWatcher and reconnect for AdminClient * Improve consumer group watcher code aethetics and robustness * Make variable names more descriptive for consumer group watcher and adimn client * Forget watcher when channel is deleted * Terminate old watcher and free the cache when config is created * Set the KafkaChannel consolidated dispatcher reconciler to be read-only * Enable HA for KafkaChannel(consolidated) dispatcher * Run update-codegen.sh * Optimize consumer group watcher * Use sets.String for internal cache instead of string slice * Allow for a single callback per watcher * Synchronization on Terminate and Forget
…tensions#182) (knative-extensions#249) * WIP: Set subscription readiness based on consumer group status * Refactor KafkaWatcher to an edge-triggered, level driven design * Refactor the test to prevent a map race * goimports and gofmt comments * Add tests for the KafkaWatcher and reconnect for AdminClient * Improve consumer group watcher code aethetics and robustness * Make variable names more descriptive for consumer group watcher and adimn client * Forget watcher when channel is deleted * Terminate old watcher and free the cache when config is created * Set the KafkaChannel consolidated dispatcher reconciler to be read-only * Enable HA for KafkaChannel(consolidated) dispatcher * Run update-codegen.sh * Optimize consumer group watcher * Use sets.String for internal cache instead of string slice * Allow for a single callback per watcher * Synchronization on Terminate and Forget
Fixes #134
Fixes #98
Fixes knative/eventing-contrib#1556
Proposed Changes
Release Note