Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Fix Kafka Channel dispatcher ownerRef #1536

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions kafka/channel/config/500-dispatcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-ch-dispatcher
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
spec:
# this deployment is going to be scaled up by the
# controller when the very first KafkaChannel is created
replicas: 0
selector:
matchLabels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
template:
metadata:
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
eventing.knative.dev/release: devel
spec:
containers:
- name: dispatcher
image: ko://knative.dev/eventing-contrib/kafka/channel/cmd/channel_dispatcher
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: METRICS_DOMAIN
value: "knative.dev/eventing"
- name: CONFIG_LOGGING_NAME
value: "config-logging"
- name: CONFIG_LEADERELECTION_NAME
value: "config-leader-election-kafka"
ports:
- containerPort: 9090
name: metrics
protocol: TCP
volumeMounts:
- name: config-kafka
mountPath: /etc/config-kafka
serviceAccountName: kafka-ch-dispatcher
volumes:
- name: config-kafka
configMap:
name: config-kafka

---

apiVersion: v1
kind: Service
metadata:
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
name: kafka-ch-dispatcher
namespace: knative-eventing
spec:
ports:
- name: http-dispatcher
port: 80
protocol: TCP
targetPort: 8080
selector:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
2 changes: 1 addition & 1 deletion kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

producer, err := sarama.NewAsyncProducer(args.Brokers, conf)
if err != nil {
return nil, fmt.Errorf("unable to create kafka producer: %v", err)
return nil, fmt.Errorf("unable to create kafka producer against Kafka bootstrap servers %v : %v", args.Brokers, err)
}

dispatcher := &KafkaDispatcher{
Expand Down
58 changes: 47 additions & 11 deletions kafka/channel/pkg/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"context"
"errors"
"fmt"
"reflect"

"k8s.io/utils/pointer"

"github.com/Shopify/sarama"

Expand Down Expand Up @@ -274,6 +275,7 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
DispatcherScope: scope,
DispatcherNamespace: dispatcherNamespace,
Image: r.dispatcherImage,
Replicas: 1,
}

expected := resources.MakeDispatcher(args)
Expand All @@ -294,17 +296,51 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
logging.FromContext(ctx).Errorw("Unable to get the dispatcher deployment", zap.Error(err))
kc.Status.MarkDispatcherUnknown("DispatcherDeploymentFailed", "Failed to get dispatcher deployment: %v", err)
return nil, err
} else if !reflect.DeepEqual(expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) {
logging.FromContext(ctx).Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
} else {
existing := utils.FindContainer(d, resources.DispatcherContainerName)
if existing == nil {
logging.FromContext(ctx).Errorw("Container %s does not exist in existing dispatcher deployment. Updating the deployment", resources.DispatcherContainerName)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
}
return d, newDeploymentWarn(err)
Comment on lines +303 to +311
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a new commit and made it easier to understand now.
Looks more similar to old code: https://github.com/knative/eventing-contrib/blob/master/kafka/channel/pkg/reconciler/controller/kafkachannel.go#L307

}

expectedContainer := utils.FindContainer(expected, resources.DispatcherContainerName)
if expectedContainer == nil {
return nil, errors.New(fmt.Sprintf("Container %s does not exist in expected dispatcher deployment. Cannot check if the deployment needs an update.", resources.DispatcherContainerName))
}

needsUpdate := false

if existing.Image != expectedContainer.Image {
logging.FromContext(ctx).Infof("Dispatcher deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
existing.Image = expectedContainer.Image
needsUpdate = true
}

if *d.Spec.Replicas == 0 {
logging.FromContext(ctx).Infof("Dispatcher deployment has 0 replicas. Scaling up deployment to 1 replicas")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replicas -> replica, both instances.

d.Spec.Replicas = pointer.Int32Ptr(1)
needsUpdate = true
}

if needsUpdate {
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(d)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
return d, newDeploymentWarn(err)
}
}
return d, newDeploymentWarn(err)
}

kc.Status.PropagateDispatcherStatus(&d.Status)
Expand Down
142 changes: 139 additions & 3 deletions kafka/channel/pkg/reconciler/controller/kafkachannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) {
Name: "Works, topic already exists",
Key: kcKey,
Objects: []runtime.Object{
makeDeploymentWithImage("differentimage"),
makeDeploymentWithImageAndReplicas("differentimage", 1),
makeService(),
makeReadyEndpoints(),
reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
Expand Down Expand Up @@ -476,6 +476,141 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) {
}, zap.L()))
}

func TestDeploymentZeroReplicas(t *testing.T) {
kcKey := testNS + "/" + kcName
row := TableRow{
Name: "Works, topic already exists",
Key: kcKey,
Objects: []runtime.Object{
makeDeploymentWithImageAndReplicas(testDispatcherImage, 0),
makeService(),
makeReadyEndpoints(),
reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName)),
},
WantErr: false,
WantCreates: []runtime.Object{
makeChannelService(reconcilekafkatesting.NewKafkaChannel(kcName, testNS)),
},
WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: makeDeployment(),
}},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithInitKafkaChannelConditions,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName),
reconcilekafkatesting.WithKafkaChannelConfigReady(),
reconcilekafkatesting.WithKafkaChannelTopicReady(),
// reconcilekafkatesting.WithKafkaChannelDeploymentReady(),
reconcilekafkatesting.WithKafkaChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelEndpointsReady(),
reconcilekafkatesting.WithKafkaChannelChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelAddress(channelServiceAddress),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated"),
Eventf(corev1.EventTypeNormal, "KafkaChannelReconciled", `KafkaChannel reconciled: "test-namespace/test-kc"`),
},
}

row.Test(t, reconcilertesting.MakeFactory(func(ctx context.Context, listers *reconcilekafkatesting.Listers, cmw configmap.Watcher) controller.Reconciler {

r := &Reconciler{
systemNamespace: testNS,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
kafkachannelLister: listers.GetKafkaChannelLister(),
// TODO fix
kafkachannelInformer: nil,
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
endpointsLister: listers.GetEndpointsLister(),
kafkaClusterAdmin: &mockClusterAdmin{
mockCreateTopicFunc: func(topic string, detail *sarama.TopicDetail, validateOnly bool) error {
errMsg := sarama.ErrTopicAlreadyExists.Error()
return &sarama.TopicError{
Err: sarama.ErrTopicAlreadyExists,
ErrMsg: &errMsg,
}
},
},
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
}

func TestDeploymentMoreThanOneReplicas(t *testing.T) {
kcKey := testNS + "/" + kcName
row := TableRow{
Name: "Works, topic already exists",
Key: kcKey,
Objects: []runtime.Object{
makeDeploymentWithImageAndReplicas(testDispatcherImage, 3),
makeService(),
makeReadyEndpoints(),
reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName)),
},
WantErr: false,
WantCreates: []runtime.Object{
makeChannelService(reconcilekafkatesting.NewKafkaChannel(kcName, testNS)),
},
WantUpdates: []clientgotesting.UpdateActionImpl{},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithInitKafkaChannelConditions,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName),
reconcilekafkatesting.WithKafkaChannelConfigReady(),
reconcilekafkatesting.WithKafkaChannelTopicReady(),
// reconcilekafkatesting.WithKafkaChannelDeploymentReady(),
reconcilekafkatesting.WithKafkaChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelEndpointsReady(),
reconcilekafkatesting.WithKafkaChannelChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelAddress(channelServiceAddress),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "KafkaChannelReconciled", `KafkaChannel reconciled: "test-namespace/test-kc"`),
},
}

row.Test(t, reconcilertesting.MakeFactory(func(ctx context.Context, listers *reconcilekafkatesting.Listers, cmw configmap.Watcher) controller.Reconciler {

r := &Reconciler{
systemNamespace: testNS,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
kafkachannelLister: listers.GetKafkaChannelLister(),
// TODO fix
kafkachannelInformer: nil,
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
endpointsLister: listers.GetEndpointsLister(),
kafkaClusterAdmin: &mockClusterAdmin{
mockCreateTopicFunc: func(topic string, detail *sarama.TopicDetail, validateOnly bool) error {
errMsg := sarama.ErrTopicAlreadyExists.Error()
return &sarama.TopicError{
Err: sarama.ErrTopicAlreadyExists,
ErrMsg: &errMsg,
}
},
},
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
}

type mockClusterAdmin struct {
mockCreateTopicFunc func(topic string, detail *sarama.TopicDetail, validateOnly bool) error
mockDeleteTopicFunc func(topic string) error
Expand Down Expand Up @@ -570,15 +705,16 @@ func (ca *mockClusterAdmin) DeleteConsumerGroup(group string) error {

var _ sarama.ClusterAdmin = (*mockClusterAdmin)(nil)

func makeDeploymentWithImage(image string) *appsv1.Deployment {
func makeDeploymentWithImageAndReplicas(image string, replicas int32) *appsv1.Deployment {
return resources.MakeDispatcher(resources.DispatcherArgs{
DispatcherNamespace: testNS,
Image: image,
Replicas: replicas,
})
}

func makeDeployment() *appsv1.Deployment {
return makeDeploymentWithImage(testDispatcherImage)
return makeDeploymentWithImageAndReplicas(testDispatcherImage, 1)
}

func makeReadyDeployment() *appsv1.Deployment {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"knative.dev/pkg/system"
)

const DispatcherContainerName = "dispatcher"

var (
serviceAccountName = "kafka-ch-dispatcher"
dispatcherName = "kafka-ch-dispatcher"
Expand All @@ -37,11 +39,12 @@ type DispatcherArgs struct {
DispatcherScope string
DispatcherNamespace string
Image string
Replicas int32
}

// MakeDispatcher generates the dispatcher deployment for the KafKa channel
func MakeDispatcher(args DispatcherArgs) *v1.Deployment {
replicas := int32(1)
replicas := args.Replicas

return &v1.Deployment{
TypeMeta: metav1.TypeMeta{
Expand All @@ -65,7 +68,7 @@ func MakeDispatcher(args DispatcherArgs) *v1.Deployment {
ServiceAccountName: serviceAccountName,
Containers: []corev1.Container{
{
Name: "dispatcher",
Name: DispatcherContainerName,
Image: args.Image,
Env: makeEnv(args),
Ports: []corev1.ContainerPort{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestNewDispatcher(t *testing.T) {
DispatcherScope: "cluster",
DispatcherNamespace: testNS,
Image: imageName,
Replicas: 1,
}

replicas := int32(1)
Expand Down Expand Up @@ -120,6 +121,7 @@ func TestNewNamespaceDispatcher(t *testing.T) {
DispatcherScope: "namespace",
DispatcherNamespace: testNS,
Image: imageName,
Replicas: 1,
}

replicas := int32(1)
Expand Down
Loading