From eaa591689dfd24be3ddaa434487bc2362e58e62f Mon Sep 17 00:00:00 2001 From: Ahmed Abdalla Abdelrehim Date: Tue, 1 Dec 2020 17:10:37 +0100 Subject: [PATCH] Port of https://github.com/knative-sandbox/eventing-kafka/pull/182 (#1672) Set subscription readiness based on consumer group status (#182) * WIP: Set subscription readiness based on consumer group status * Refactor KafkaWatcher to an edge-triggered, level driven design * Refactor the test to prevent a map race * goimports and gofmt comments * Add tests for the KafkaWatcher and reconnect for AdminClient * Improve consumer group watcher code aethetics and robustness * Make variable names more descriptive for consumer group watcher and adimn client * Forget watcher when channel is deleted * Terminate old watcher and free the cache when config is created * Set the KafkaChannel consolidated dispatcher reconciler to be read-only * Enable HA for KafkaChannel(consolidated) dispatcher * Run update-codegen.sh * Optimize consumer group watcher * Use sets.String for internal cache instead of string slice * Allow for a single callback per watcher * Synchronization on Terminate and Forget --- kafka/channel/cmd/channel_dispatcher/main.go | 3 - kafka/channel/pkg/kafka/admin.go | 108 ++++++++++ kafka/channel/pkg/kafka/admin_test.go | 94 +++++++++ .../controller/consumer_group_watcher.go | 198 ++++++++++++++++++ .../controller/consumer_group_watcher_test.go | 84 ++++++++ .../pkg/reconciler/controller/kafkachannel.go | 128 ++++++++--- .../controller/kafkachannel_test.go | 44 +++- .../pkg/reconciler/dispatcher/kafkachannel.go | 40 ++-- .../pkg/reconciler/testing/kafkachannel.go | 25 +++ 9 files changed, 662 insertions(+), 62 deletions(-) create mode 100644 kafka/channel/pkg/kafka/admin.go create mode 100644 kafka/channel/pkg/kafka/admin_test.go create mode 100644 kafka/channel/pkg/reconciler/controller/consumer_group_watcher.go create mode 100644 kafka/channel/pkg/reconciler/controller/consumer_group_watcher_test.go diff --git a/kafka/channel/cmd/channel_dispatcher/main.go b/kafka/channel/cmd/channel_dispatcher/main.go index 1808d922ef..d74869935d 100644 --- a/kafka/channel/cmd/channel_dispatcher/main.go +++ b/kafka/channel/cmd/channel_dispatcher/main.go @@ -35,8 +35,5 @@ func main() { ctx = injection.WithNamespaceScope(ctx, ns) } - // Do not run the dispatcher in leader-election mode - ctx = sharedmain.WithHADisabled(ctx) - sharedmain.MainWithContext(ctx, component, controller.NewController) } diff --git a/kafka/channel/pkg/kafka/admin.go b/kafka/channel/pkg/kafka/admin.go new file mode 100644 index 0000000000..222c54be6c --- /dev/null +++ b/kafka/channel/pkg/kafka/admin.go @@ -0,0 +1,108 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafka + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/Shopify/sarama" + "knative.dev/pkg/logging" +) + +var mutex sync.Mutex + +type ClusterAdminFactory func() (sarama.ClusterAdmin, error) + +type AdminClient interface { + // ListConsumerGroups Lists the consumer groups + ListConsumerGroups() ([]string, error) +} + +// AdminClientManager manages a ClusterAdmin connection and recreates one when needed +// it is made to overcome https://github.com/Shopify/sarama/issues/1162 +type AdminClientManager struct { + logger *zap.SugaredLogger + adminFactory ClusterAdminFactory + clusterAdmin sarama.ClusterAdmin +} + +func NewAdminClient(ctx context.Context, caFactory ClusterAdminFactory) (AdminClient, error) { + logger := logging.FromContext(ctx) + logger.Info("Creating a new AdminClient") + kafkaClusterAdmin, err := caFactory() + if err != nil { + logger.Errorw("error while creating ClusterAdmin", zap.Error(err)) + return nil, err + } + return &AdminClientManager{ + logger: logger, + adminFactory: caFactory, + clusterAdmin: kafkaClusterAdmin, + }, nil +} + +// ListConsumerGroups Returns a list of the consumer groups. +// +// In the occasion of errors, there will be a retry with an exponential backoff. +// Due to a known issue in Sarama ClusterAdmin https://github.com/Shopify/sarama/issues/1162, +// a new ClusterAdmin will be created with every retry until the call succeeds or +// the timeout is reached. +func (c *AdminClientManager) ListConsumerGroups() ([]string, error) { + c.logger.Info("Attempting to list consumer group") + mutex.Lock() + defer mutex.Unlock() + r := 0 + // This gives us around ~13min of exponential backoff + max := 13 + cgsMap, err := c.clusterAdmin.ListConsumerGroups() + for err != nil && r <= max { + // There's on error, let's retry and presume a new ClusterAdmin can fix it + + // Calculate incremental delay following this https://docs.aws.amazon.com/general/latest/gr/api-retries.html + t := int(math.Pow(2, float64(r)) * 100) + d := time.Duration(t) * time.Millisecond + c.logger.Errorw("listing consumer group failed. Refreshing the ClusterAdmin and retrying.", + zap.Error(err), + zap.Duration("retry after", d), + zap.Int("Retry attempt", r), + zap.Int("Max retries", max), + ) + time.Sleep(d) + + // let's reconnect and try again + c.clusterAdmin, err = c.adminFactory() + r += 1 + if err != nil { + // skip this attempt + continue + } + cgsMap, err = c.clusterAdmin.ListConsumerGroups() + } + + if r > max { + return nil, fmt.Errorf("failed to refresh the culster admin and retry: %v", err) + } + + return sets.StringKeySet(cgsMap).List(), nil +} diff --git a/kafka/channel/pkg/kafka/admin_test.go b/kafka/channel/pkg/kafka/admin_test.go new file mode 100644 index 0000000000..1bcd58776f --- /dev/null +++ b/kafka/channel/pkg/kafka/admin_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafka + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/Shopify/sarama" + + pkgtesting "knative.dev/pkg/logging/testing" +) + +const testCG = "cg1" + +var m sync.RWMutex + +type FakeClusterAdmin struct { + sarama.ClusterAdmin + faulty bool +} + +func (f *FakeClusterAdmin) ListConsumerGroups() (map[string]string, error) { + cgs := map[string]string{ + testCG: "cg", + } + m.RLock() + defer m.RUnlock() + if f.faulty { + return nil, fmt.Errorf("Error") + } + return cgs, nil +} + +func TestAdminClient(t *testing.T) { + var wg sync.WaitGroup + wg.Add(10) + ctx := pkgtesting.TestContextWithLogger(t) + f := &FakeClusterAdmin{} + ac, err := NewAdminClient(ctx, func() (sarama.ClusterAdmin, error) { + return f, nil + }) + if err != nil { + t.Error("failed to obtain new client", err) + } + for i := 0; i < 10; i += 1 { + go func() { + doList(t, ac) + check := make(chan struct{}) + go func() { + m.Lock() + f.faulty = true + m.Unlock() + check <- struct{}{} + time.Sleep(2 * time.Second) + m.Lock() + f.faulty = false + m.Unlock() + check <- struct{}{} + }() + <-check + doList(t, ac) + <-check + wg.Done() + }() + } + wg.Wait() +} + +func doList(t *testing.T, ac AdminClient) { + cgs, _ := ac.ListConsumerGroups() + if len(cgs) != 1 { + t.Fatalf("list consumer group: got %d, want %d", len(cgs), 1) + } + if cgs[0] != testCG { + t.Fatalf("consumer group: got %s, want %s", cgs[0], testCG) + } +} diff --git a/kafka/channel/pkg/reconciler/controller/consumer_group_watcher.go b/kafka/channel/pkg/reconciler/controller/consumer_group_watcher.go new file mode 100644 index 0000000000..7d3b13d0b4 --- /dev/null +++ b/kafka/channel/pkg/reconciler/controller/consumer_group_watcher.go @@ -0,0 +1,198 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "sync" + "time" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/pkg/logging" + + "knative.dev/eventing-contrib/kafka/channel/pkg/kafka" +) + +var ( + watchersMtx sync.RWMutex + cacheMtx sync.RWMutex + // Hooks into the poll logic for testing + after = time.After + done = func() {} +) + +type ConsumerGroupHandler func() +type Matcher func(string) bool + +type ConsumerGroupWatcher interface { + // Start instructs the watcher to start polling for the consumer groups and + // notify any observers on the event of any changes + Start() error + + // Terminate instructs the watcher to stop polling and clear the watchers cache + Terminate() + + // Watch registers callback on the event of any changes observed + // on the consumer groups. watcherID is an arbitrary string the user provides + // that will be used to identify his callbacks when provided to Forget(watcherID). + // + // To ensure this is event-triggered, level-driven, + // we don't pass the updates to the callback, instead the observer is expected + // to use List() to get the updated list of ConsumerGroups. + Watch(watcherID string, callback ConsumerGroupHandler) error + + // Forget removes all callbacks that correspond to the watcherID + Forget(watcherID string) + + // List returns all the cached consumer groups that match matcher. + // It will return an empty slice if none matched or the cache is empty + List(matcher Matcher) []string +} + +type WatcherImpl struct { + logger *zap.SugaredLogger + //TODO name? + watchers map[string]ConsumerGroupHandler + cachedConsumerGroups sets.String + adminClient kafka.AdminClient + pollDuration time.Duration + done chan struct{} +} + +func NewConsumerGroupWatcher(ctx context.Context, ac kafka.AdminClient, pollDuration time.Duration) ConsumerGroupWatcher { + return &WatcherImpl{ + logger: logging.FromContext(ctx), + adminClient: ac, + pollDuration: pollDuration, + watchers: make(map[string]ConsumerGroupHandler), + cachedConsumerGroups: sets.String{}, + } +} + +func (w *WatcherImpl) Start() error { + w.logger.Infow("ConsumerGroupWatcher starting. Polling for consumer groups", zap.Duration("poll duration", w.pollDuration)) + go func() { + for { + select { + case <-after(w.pollDuration): + // let's get current observed consumer groups + observedCGs, err := w.adminClient.ListConsumerGroups() + if err != nil { + w.logger.Errorw("error while listing consumer groups", zap.Error(err)) + continue + } + var notify bool + var changedGroup string + observedCGsSet := sets.String{}.Insert(observedCGs...) + // Look for observed CGs + for c := range observedCGsSet { + if !w.cachedConsumerGroups.Has(c) { + // This is the first appearance. + w.logger.Debugw("Consumer group observed. Caching.", + zap.String("consumer group", c)) + changedGroup = c + notify = true + break + } + } + // Look for disappeared CGs + for c := range w.cachedConsumerGroups { + if !observedCGsSet.Has(c) { + // This CG was cached but it's no longer there. + w.logger.Debugw("Consumer group deleted.", + zap.String("consumer group", c)) + changedGroup = c + notify = true + break + } + } + if notify { + cacheMtx.Lock() + w.cachedConsumerGroups = observedCGsSet + cacheMtx.Unlock() + w.notify(changedGroup) + } + done() + case <-w.done: + break + } + } + }() + return nil +} + +func (w *WatcherImpl) Terminate() { + watchersMtx.Lock() + cacheMtx.Lock() + defer watchersMtx.Unlock() + defer cacheMtx.Unlock() + + w.watchers = nil + w.cachedConsumerGroups = nil + w.done <- struct{}{} +} + +// TODO explore returning a channel instead of a taking callback +func (w *WatcherImpl) Watch(watcherID string, cb ConsumerGroupHandler) error { + w.logger.Debugw("Adding a new watcher", zap.String("watcherID", watcherID)) + watchersMtx.Lock() + defer watchersMtx.Unlock() + w.watchers[watcherID] = cb + + // notify at least once to get the current state + cb() + return nil +} + +func (w *WatcherImpl) Forget(watcherID string) { + w.logger.Debugw("Forgetting watcher", zap.String("watcherID", watcherID)) + watchersMtx.Lock() + defer watchersMtx.Unlock() + delete(w.watchers, watcherID) +} + +func (w *WatcherImpl) List(matcher Matcher) []string { + w.logger.Debug("Listing consumer groups") + cacheMtx.RLock() + defer cacheMtx.RUnlock() + cgs := make([]string, 0) + for cg := range w.cachedConsumerGroups { + if matcher(cg) { + cgs = append(cgs, cg) + } + } + return cgs +} + +func (w *WatcherImpl) notify(cg string) { + watchersMtx.RLock() + defer watchersMtx.RUnlock() + + for _, cb := range w.watchers { + cb() + } +} + +func Find(list []string, item string) bool { + for _, i := range list { + if i == item { + return true + } + } + return false +} diff --git a/kafka/channel/pkg/reconciler/controller/consumer_group_watcher_test.go b/kafka/channel/pkg/reconciler/controller/consumer_group_watcher_test.go new file mode 100644 index 0000000000..e543c01ced --- /dev/null +++ b/kafka/channel/pkg/reconciler/controller/consumer_group_watcher_test.go @@ -0,0 +1,84 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + + pkgtesting "knative.dev/pkg/logging/testing" +) + +//TODO how to mock the sarama AdminClient +type FakeClusterAdmin struct { + mutex sync.RWMutex + cgs sets.String +} + +func (fake *FakeClusterAdmin) ListConsumerGroups() ([]string, error) { + fake.mutex.RLock() + defer fake.mutex.RUnlock() + return fake.cgs.List(), nil +} + +func (fake *FakeClusterAdmin) deleteCG(cg string) { + fake.mutex.Lock() + defer fake.mutex.Unlock() + fake.cgs.Delete(cg) +} + +func TestKafkaWatcher(t *testing.T) { + cgname := "kafka.event-example.default-kne-trigger.0d9c4383-1e68-42b5-8c3a-3788274404c5" + wid := "channel-abc" + cgs := sets.String{} + cgs.Insert(cgname) + ca := FakeClusterAdmin{ + cgs: cgs, + } + + ch := make(chan sets.String, 1) + + w := NewConsumerGroupWatcher(pkgtesting.TestContextWithLogger(t), &ca, 2*time.Second) + w.Watch(wid, func() { + cgs := w.List(func(cg string) bool { + return cgname == cg + }) + result := sets.String{} + result.Insert(cgs...) + ch <- result + }) + + w.Start() + <-ch + assertSync(t, ch, cgs) + ca.deleteCG(cgname) + assertSync(t, ch, sets.String{}) +} + +func assertSync(t *testing.T, ch chan sets.String, cgs sets.String) { + select { + case syncedCGs := <-ch: + if !syncedCGs.Equal(cgs) { + t.Errorf("observed and expected consumer groups do not match. got %v expected %v", syncedCGs, cgs) + } + case <-time.After(6 * time.Second): + t.Errorf("timedout waiting for consumer groups to sync") + } +} diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel.go b/kafka/channel/pkg/reconciler/controller/kafkachannel.go index 1d144b0a72..21ddc01bf2 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel.go @@ -20,44 +20,44 @@ import ( "context" "errors" "fmt" - - source "knative.dev/eventing-contrib/kafka" - - "k8s.io/utils/pointer" + "strings" + "time" "github.com/Shopify/sarama" - "go.uber.org/zap" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" appsv1listers "k8s.io/client-go/listers/apps/v1" corev1listers "k8s.io/client-go/listers/core/v1" rbacv1listers "k8s.io/client-go/listers/rbac/v1" "k8s.io/client-go/tools/cache" + "k8s.io/utils/pointer" - "knative.dev/eventing/pkg/apis/eventing" - eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" - "knative.dev/eventing/pkg/reconciler/names" - "knative.dev/pkg/logging" - - "knative.dev/pkg/apis" - "knative.dev/pkg/controller" - pkgreconciler "knative.dev/pkg/reconciler" - + source "knative.dev/eventing-contrib/kafka" "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1beta1" kafkaclientset "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned" kafkaScheme "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned/scheme" kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1beta1" + "knative.dev/eventing-contrib/kafka/channel/pkg/kafka" "knative.dev/eventing-contrib/kafka/channel/pkg/reconciler/controller/resources" "knative.dev/eventing-contrib/kafka/channel/pkg/utils" + v1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/eventing" + eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" + "knative.dev/eventing/pkg/reconciler/names" + "knative.dev/pkg/apis" + "knative.dev/pkg/apis/duck" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + pkgreconciler "knative.dev/pkg/reconciler" ) const ( @@ -75,6 +75,8 @@ const ( dispatcherRoleBindingCreated = "DispatcherRoleBindingCreated" dispatcherName = "kafka-ch-dispatcher" + + pollInterval = 2 * time.Second ) func newReconciledNormal(namespace, name string) pkgreconciler.Event { @@ -116,11 +118,10 @@ type Reconciler struct { kafkaAuthConfig *utils.KafkaAuthConfig kafkaConfigError error kafkaClientSet kafkaclientset.Interface - // Using a shared kafkaClusterAdmin does not work currently because of an issue with // Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162. - kafkaClusterAdmin sarama.ClusterAdmin - + kafkaClusterAdmin sarama.ClusterAdmin + consumerGroupWatcher ConsumerGroupWatcher kafkachannelLister listers.KafkaChannelLister kafkachannelInformer cache.SharedIndexInformer deploymentLister appsv1listers.DeploymentLister @@ -162,7 +163,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel return r.kafkaConfigError } - kafkaClusterAdmin, err := r.createClient(ctx, kc) + kafkaClusterAdmin, err := r.createClient() if err != nil { kc.Status.MarkConfigFailed("InvalidConfiguration", "Unable to build Kafka admin client for channel %s: %v", kc.Name, err) return err @@ -177,7 +178,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel // 4. Dispatcher endpoints to ensure that there's something backing the Service. // 5. K8s service representing the channel that will use ExternalName to point to the Dispatcher k8s service. - if err := r.createTopic(ctx, kc, kafkaClusterAdmin); err != nil { + if err := r.reconcileTopic(ctx, kc, kafkaClusterAdmin); err != nil { kc.Status.MarkTopicFailed("TopicCreateFailed", "error while creating topic: %s", err) return err } @@ -238,7 +239,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel Scheme: "http", Host: names.ServiceHostName(svc.Name, svc.Namespace), }) - + err = r.setupSubscriptionStatusWatcher(ctx, kc) + if err != nil { + logger.Errorw("error setting up some subscription status watchers", zap.Error(err)) + } // close the connection err = kafkaClusterAdmin.Close() if err != nil { @@ -251,6 +255,60 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel return newReconciledNormal(kc.Namespace, kc.Name) } +func (r *Reconciler) setupSubscriptionStatusWatcher(ctx context.Context, channel *v1beta1.KafkaChannel) error { + var err error + groupIDPrefix := fmt.Sprintf("kafka.%s.%s", channel.Namespace, channel.Name) + + m := func(cg string) bool { + return strings.HasPrefix(cg, groupIDPrefix) + } + err = r.consumerGroupWatcher.Watch(string(channel.ObjectMeta.UID), func() { + err := r.markSubscriptionReadiness(ctx, channel, r.consumerGroupWatcher.List(m)) + if err != nil { + logging.FromContext(ctx).Errorw("error updating subscription readiness", zap.Error(err)) + } + }) + return err +} + +func (r *Reconciler) markSubscriptionReadiness(ctx context.Context, ch *v1beta1.KafkaChannel, cgs []string) error { + after := ch.DeepCopy() + after.Status.Subscribers = make([]v1.SubscriberStatus, 0) + + for _, s := range ch.Spec.Subscribers { + cg := fmt.Sprintf("kafka.%s.%s.%s", ch.Namespace, ch.Name, s.UID) + if Find(cgs, cg) { + logging.FromContext(ctx).Debugw("marking subscription", zap.Any("subscription", s)) + after.Status.Subscribers = append(after.Status.Subscribers, v1.SubscriberStatus{ + UID: s.UID, + ObservedGeneration: s.Generation, + Ready: corev1.ConditionTrue, + }) + } + } + + jsonPatch, err := duck.CreatePatch(ch, after) + if err != nil { + return fmt.Errorf("creating JSON patch: %w", err) + } + // If there is nothing to patch, we are good, just return. + // Empty patch is [], hence we check for that. + if len(jsonPatch) == 0 { + return nil + } + patch, err := jsonPatch.MarshalJSON() + if err != nil { + return fmt.Errorf("marshaling JSON patch: %w", err) + } + patched, err := r.kafkaClientSet.MessagingV1beta1().KafkaChannels(ch.Namespace).Patch(ctx, ch.Name, types.JSONPatchType, patch, metav1.PatchOptions{}, "status") + + if err != nil { + return fmt.Errorf("Failed patching: %w", err) + } + logging.FromContext(ctx).Debugw("Patched resource", zap.Any("patch", patch), zap.Any("patched", patched)) + return nil +} + func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, dispatcherNamespace string, kc *v1beta1.KafkaChannel) (*appsv1.Deployment, error) { if scope == scopeNamespace { // Configure RBAC in namespace to access the configmaps @@ -464,7 +522,7 @@ func (r *Reconciler) reconcileChannelService(ctx context.Context, dispatcherName return svc, nil } -func (r *Reconciler) createClient(ctx context.Context, kc *v1beta1.KafkaChannel) (sarama.ClusterAdmin, error) { +func (r *Reconciler) createClient() (sarama.ClusterAdmin, error) { // We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time. // This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162. // Once the issue is fixed we should use a shared cluster admin client. Also, r.kafkaClusterAdmin is currently @@ -480,7 +538,7 @@ func (r *Reconciler) createClient(ctx context.Context, kc *v1beta1.KafkaChannel) return kafkaClusterAdmin, nil } -func (r *Reconciler) createTopic(ctx context.Context, channel *v1beta1.KafkaChannel, kafkaClusterAdmin sarama.ClusterAdmin) error { +func (r *Reconciler) reconcileTopic(ctx context.Context, channel *v1beta1.KafkaChannel, kafkaClusterAdmin sarama.ClusterAdmin) error { logger := logging.FromContext(ctx) topicName := utils.TopicName(utils.KafkaChannelSeparator, channel.Namespace, channel.Name) @@ -516,10 +574,11 @@ func (r *Reconciler) deleteTopic(ctx context.Context, channel *v1beta1.KafkaChan } func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.ConfigMap) { - logging.FromContext(ctx).Info("Reloading Kafka configuration") + logger := logging.FromContext(ctx) + logger.Info("Reloading Kafka configuration") kafkaConfig, err := utils.GetKafkaConfig(configMap.Data) if err != nil { - logging.FromContext(ctx).Errorw("Error reading Kafka configuration", zap.Error(err)) + logger.Errorw("Error reading Kafka configuration", zap.Error(err)) } if kafkaConfig.AuthSecretName != "" { @@ -530,15 +589,32 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co // Eventually the previous config should be snapshotted to delete Kafka topics r.kafkaConfig = kafkaConfig r.kafkaConfigError = err + ac, err := kafka.NewAdminClient(ctx, func() (sarama.ClusterAdmin, error) { + return source.MakeAdminClient(controllerAgentName, r.kafkaAuthConfig, kafkaConfig.Brokers) + }) + + if err != nil { + logger.Errorw("Error creating AdminClient", zap.Error(err)) + return + } + + if r.consumerGroupWatcher != nil { + r.consumerGroupWatcher.Terminate() + } + + r.consumerGroupWatcher = NewConsumerGroupWatcher(ctx, ac, pollInterval) + //TODO handle error + r.consumerGroupWatcher.Start() } func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { // Do not attempt retrying creating the client because it might be a permanent error // in which case the finalizer will never get removed. - if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.kafkaConfig != nil { + if kafkaClusterAdmin, err := r.createClient(); err == nil && r.kafkaConfig != nil { if err := r.deleteTopic(ctx, kc, kafkaClusterAdmin); err != nil { return err } } + r.consumerGroupWatcher.Forget(string(kc.ObjectMeta.UID)) return newReconciledNormal(kc.Namespace, kc.Name) //ok to remove finalizer } diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go index 95509a17a6..c0b109baf9 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go @@ -20,6 +20,10 @@ import ( "context" "fmt" "testing" + "time" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/pkg/apis" "github.com/Shopify/sarama" @@ -59,6 +63,8 @@ const ( channelServiceAddress = "test-kc-kn-channel.test-namespace.svc.cluster.local" brokerName = "test-broker" finalizerName = "kafkachannels.messaging.knative.dev" + sub1UID = "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1" + sub2UID = "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1" ) var ( @@ -236,12 +242,14 @@ func TestAllCases(t *testing.T) { makeService(), makeReadyEndpoints(), reconcilekafkatesting.NewKafkaChannel(kcName, testNS, + reconcilertesting.WithKafkaChannelSubscribers(subscribers()), reconcilekafkatesting.WithKafkaFinalizer(finalizerName)), makeChannelService(reconcilekafkatesting.NewKafkaChannel(kcName, testNS)), }, WantErr: false, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconcilekafkatesting.NewKafkaChannel(kcName, testNS, + reconcilertesting.WithKafkaChannelSubscribers(subscribers()), reconcilekafkatesting.WithInitKafkaChannelConditions, reconcilekafkatesting.WithKafkaFinalizer(finalizerName), reconcilekafkatesting.WithKafkaChannelConfigReady(), @@ -327,7 +335,8 @@ func TestAllCases(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - kafkachannelLister: listers.GetKafkaChannelLister(), + consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -384,7 +393,8 @@ func TestTopicExists(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - kafkachannelLister: listers.GetKafkaChannelLister(), + consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -453,7 +463,8 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - kafkachannelLister: listers.GetKafkaChannelLister(), + consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -522,7 +533,8 @@ func TestDeploymentZeroReplicas(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - kafkachannelLister: listers.GetKafkaChannelLister(), + consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -588,7 +600,8 @@ func TestDeploymentMoreThanOneReplicas(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - kafkachannelLister: listers.GetKafkaChannelLister(), + consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -683,7 +696,11 @@ func (ca *mockClusterAdmin) DeleteACL(filter sarama.AclFilter, validateOnly bool } func (ca *mockClusterAdmin) ListConsumerGroups() (map[string]string, error) { - return nil, nil + cgs := map[string]string{ + fmt.Sprintf("kafka.%s.%s.%s", kcName, testNS, sub1UID): "consumer", + fmt.Sprintf("kafka.%s.%s.%s", kcName, testNS, sub2UID): "consumer", + } + return cgs, nil } func (ca *mockClusterAdmin) DescribeConsumerGroups(groups []string) ([]*sarama.GroupDescription, error) { @@ -797,3 +814,18 @@ func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { action.Patch = []byte(patch) return action } + +func subscribers() []eventingduckv1.SubscriberSpec { + + return []eventingduckv1.SubscriberSpec{{ + UID: sub1UID, + Generation: 1, + SubscriberURI: apis.HTTP("call1"), + ReplyURI: apis.HTTP("sink2"), + }, { + UID: sub2UID, + Generation: 2, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }} +} diff --git a/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go b/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go index 5e824bf767..8d4a288c4c 100644 --- a/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go @@ -21,13 +21,10 @@ import ( "fmt" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/kncloudevents" @@ -117,7 +114,9 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl kafkachannelLister: kafkaChannelInformer.Lister(), kafkachannelInformer: kafkaChannelInformer.Informer(), } - r.impl = kafkachannelreconciler.NewImpl(ctx, r) + r.impl = kafkachannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{SkipStatusUpdates: true} + }) logger.Info("Setting up event handlers") @@ -146,6 +145,16 @@ func filterWithAnnotation(namespaced bool) func(obj interface{}) bool { } func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { + logging.FromContext(ctx).Debugw("ReconcileKind for channel", zap.String("channel", kc.Name)) + return r.syncDispatcher(ctx) +} + +func (r *Reconciler) ObserveKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { + logging.FromContext(ctx).Debugw("ObserveKind for channel", zap.String("channel", kc.Name)) + return r.syncDispatcher(ctx) +} + +func (r *Reconciler) syncDispatcher(ctx context.Context) pkgreconciler.Event { channels, err := r.kafkachannelLister.List(labels.Everything()) if err != nil { logging.FromContext(ctx).Error("Error listing kafka channels") @@ -172,7 +181,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel logging.FromContext(ctx).Error("Error updating kafka consumers in dispatcher") return err } - kc.Status.SubscribableStatus = r.createSubscribableStatus(&kc.Spec.SubscribableSpec, failedSubscriptions) if len(failedSubscriptions) > 0 { logging.FromContext(ctx).Error("Some kafka subscriptions failed to subscribe") return fmt.Errorf("Some kafka subscriptions failed to subscribe") @@ -180,28 +188,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel return nil } -func (r *Reconciler) createSubscribableStatus(subscribable *eventingduckv1.SubscribableSpec, failedSubscriptions map[types.UID]error) eventingduckv1.SubscribableStatus { - if subscribable == nil { - return eventingduckv1.SubscribableStatus{} - } - subscriberStatus := make([]eventingduckv1.SubscriberStatus, 0) - for _, sub := range subscribable.Subscribers { - status := eventingduckv1.SubscriberStatus{ - UID: sub.UID, - ObservedGeneration: sub.Generation, - Ready: corev1.ConditionTrue, - } - if err, ok := failedSubscriptions[sub.UID]; ok { - status.Ready = corev1.ConditionFalse - status.Message = err.Error() - } - subscriberStatus = append(subscriberStatus, status) - } - return eventingduckv1.SubscribableStatus{ - Subscribers: subscriberStatus, - } -} - // newConfigFromKafkaChannels creates a new Config from the list of kafka channels. func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1beta1.KafkaChannel) *dispatcher.ChannelConfig { channelConfig := dispatcher.ChannelConfig{ diff --git a/kafka/channel/pkg/reconciler/testing/kafkachannel.go b/kafka/channel/pkg/reconciler/testing/kafkachannel.go index 24b449b6db..f9d0d378d0 100644 --- a/kafka/channel/pkg/reconciler/testing/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/testing/kafkachannel.go @@ -20,6 +20,10 @@ import ( "context" "time" + "k8s.io/apimachinery/pkg/types" + + v1 "knative.dev/eventing/pkg/apis/duck/v1" + "k8s.io/apimachinery/pkg/util/sets" appsv1 "k8s.io/api/apps/v1" @@ -38,6 +42,7 @@ func NewKafkaChannel(name, namespace string, ncopt ...KafkaChannelOption) *v1bet ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + UID: types.UID("abc-xyz"), }, Spec: v1beta1.KafkaChannelSpec{}, } @@ -133,3 +138,23 @@ func WithKafkaFinalizer(finalizerName string) KafkaChannelOption { nc.SetFinalizers(finalizers.List()) } } + +func WithKafkaChannelSubscribers(subs []v1.SubscriberSpec) KafkaChannelOption { + return func(nc *v1beta1.KafkaChannel) { + nc.Spec.Subscribers = subs + } +} + +func WithKafkaChannelStatusSubscribers() KafkaChannelOption { + return func(nc *v1beta1.KafkaChannel) { + ss := make([]v1.SubscriberStatus, len(nc.Spec.Subscribers)) + for _, s := range nc.Spec.Subscribers { + ss = append(ss, v1.SubscriberStatus{ + UID: s.UID, + ObservedGeneration: s.Generation, + Ready: corev1.ConditionTrue, + }) + } + nc.Status.Subscribers = ss + } +}