Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper committed Apr 7, 2021
1 parent 4e0128a commit 51d46f1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
45 changes: 44 additions & 1 deletion pkg/channel/consolidated/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (d *KafkaDispatcher) UpdateHostToChannelMap(config *Config) error {
return errors.New("nil config")
}

// TODO why we have a lock inside another lock for this map?!
d.hostToChannelMapLock.Lock()
defer d.hostToChannelMapLock.Unlock()

Expand All @@ -253,6 +254,44 @@ func (d *KafkaDispatcher) UpdateHostToChannelMap(config *Config) error {
return nil
}

func (d *KafkaDispatcher) CleanupChannel(name, namespace, hostname string) error {
channelRef := eventingchannels.ChannelReference{
Name: name,
Namespace: namespace,
}

// Remove from the hostToChannel map the mapping with this channel
// TODO why we have a lock inside another lock for this map?!
d.hostToChannelMapLock.Lock()
hcMap := d.getHostToChannelMap()
if hcMap != nil {
delete(hcMap, hostname)
d.setHostToChannelMap(hcMap)
}
d.hostToChannelMapLock.Unlock()

// Remove all subs
d.consumerUpdateLock.Lock()
defer d.consumerUpdateLock.Unlock()

if d.channelSubscriptions[channelRef] == nil {
// No subs to remove
return nil
}

// Avoid concurrent modification while iterating
subs := make([]types.UID, len(d.channelSubscriptions[channelRef].subs))
copy(subs, d.channelSubscriptions[channelRef].subs)

for _, s := range subs {
if err := d.unsubscribe(channelRef, d.subscriptions[s]); err != nil {
return err
}
}

return nil
}

// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub Subscription) error {
Expand Down Expand Up @@ -306,7 +345,11 @@ func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference,
newSlice = append(newSlice, oldSub)
}
}
d.channelSubscriptions[channel].subs = newSlice
if len(newSlice) != 0 {
d.channelSubscriptions[channel].subs = newSlice
} else {
delete(d.channelSubscriptions, channel)
}
}
if consumer, ok := d.subsConsumerGroups[sub.UID]; ok {
delete(d.subsConsumerGroups, sub.UID)
Expand Down
14 changes: 14 additions & 0 deletions pkg/channel/consolidated/reconciler/dispatcher/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ func (r *Reconciler) ObserveKind(ctx context.Context, kc *v1beta1.KafkaChannel)
return r.syncDispatcher(ctx)
}

func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event {
logging.FromContext(ctx).Debugw("FinalizeKind for channel", zap.String("channel", kc.Name))
return r.finalizeChannel(ctx, kc)
}

func (r *Reconciler) ObserveFinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event {
logging.FromContext(ctx).Debugw("ObserveFinalizeKind for channel", zap.String("channel", kc.Name))
return r.finalizeChannel(ctx, kc)
}

func (r *Reconciler) syncDispatcher(ctx context.Context) pkgreconciler.Event {
channels, err := r.kafkachannelLister.List(labels.Everything())
if err != nil {
Expand Down Expand Up @@ -191,6 +201,10 @@ func (r *Reconciler) syncDispatcher(ctx context.Context) pkgreconciler.Event {
return nil
}

func (r *Reconciler) finalizeChannel(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event {
return r.kafkaDispatcher.CleanupChannel(kc.Name, kc.Namespace, kc.Status.Address.URL.Host)
}

// newConfigFromKafkaChannels creates a new Config from the list of kafka channels.
func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1beta1.KafkaChannel) *dispatcher.ChannelConfig {
channelConfig := dispatcher.ChannelConfig{
Expand Down

0 comments on commit 51d46f1

Please sign in to comment.