Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Fix Kafka Channel dispatcher ownerRef (#1536)
Browse files Browse the repository at this point in the history
* Fix Kafka Channel dispatcher ownerRef

* Fix Kafka Channel dispatcher ownerRef

* Fix Kafka Channel dispatcher ownerRef

* Addressing comments

* Make more readable

* Unit tests for scale up

* Unit tests for `findContainer`
  • Loading branch information
aliok authored Sep 10, 2020
1 parent 050867e commit 1fcaa7b
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 17 deletions.
83 changes: 83 additions & 0 deletions kafka/channel/config/500-dispatcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-ch-dispatcher
namespace: knative-eventing
labels:
eventing.knative.dev/release: devel
spec:
# this deployment is going to be scaled up by the
# controller when the very first KafkaChannel is created
replicas: 0
selector:
matchLabels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
template:
metadata:
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
eventing.knative.dev/release: devel
spec:
containers:
- name: dispatcher
image: ko://knative.dev/eventing-contrib/kafka/channel/cmd/channel_dispatcher
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
- name: METRICS_DOMAIN
value: "knative.dev/eventing"
- name: CONFIG_LOGGING_NAME
value: "config-logging"
- name: CONFIG_LEADERELECTION_NAME
value: "config-leader-election-kafka"
ports:
- containerPort: 9090
name: metrics
protocol: TCP
volumeMounts:
- name: config-kafka
mountPath: /etc/config-kafka
serviceAccountName: kafka-ch-dispatcher
volumes:
- name: config-kafka
configMap:
name: config-kafka

---

apiVersion: v1
kind: Service
metadata:
labels:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
name: kafka-ch-dispatcher
namespace: knative-eventing
spec:
ports:
- name: http-dispatcher
port: 80
protocol: TCP
targetPort: 8080
selector:
messaging.knative.dev/channel: kafka-channel
messaging.knative.dev/role: dispatcher
2 changes: 1 addition & 1 deletion kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat

producer, err := sarama.NewAsyncProducer(args.Brokers, conf)
if err != nil {
return nil, fmt.Errorf("unable to create kafka producer: %v", err)
return nil, fmt.Errorf("unable to create kafka producer against Kafka bootstrap servers %v : %v", args.Brokers, err)
}

dispatcher := &KafkaDispatcher{
Expand Down
58 changes: 47 additions & 11 deletions kafka/channel/pkg/reconciler/controller/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"context"
"errors"
"fmt"
"reflect"

"k8s.io/utils/pointer"

"github.com/Shopify/sarama"

Expand Down Expand Up @@ -274,6 +275,7 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
DispatcherScope: scope,
DispatcherNamespace: dispatcherNamespace,
Image: r.dispatcherImage,
Replicas: 1,
}

expected := resources.MakeDispatcher(args)
Expand All @@ -294,17 +296,51 @@ func (r *Reconciler) reconcileDispatcher(ctx context.Context, scope string, disp
logging.FromContext(ctx).Errorw("Unable to get the dispatcher deployment", zap.Error(err))
kc.Status.MarkDispatcherUnknown("DispatcherDeploymentFailed", "Failed to get dispatcher deployment: %v", err)
return nil, err
} else if !reflect.DeepEqual(expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image) {
logging.FromContext(ctx).Infof("Deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
} else {
existing := utils.FindContainer(d, resources.DispatcherContainerName)
if existing == nil {
logging.FromContext(ctx).Errorw("Container %s does not exist in existing dispatcher deployment. Updating the deployment", resources.DispatcherContainerName)
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(expected)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
}
return d, newDeploymentWarn(err)
}

expectedContainer := utils.FindContainer(expected, resources.DispatcherContainerName)
if expectedContainer == nil {
return nil, errors.New(fmt.Sprintf("Container %s does not exist in expected dispatcher deployment. Cannot check if the deployment needs an update.", resources.DispatcherContainerName))
}

needsUpdate := false

if existing.Image != expectedContainer.Image {
logging.FromContext(ctx).Infof("Dispatcher deployment image is not what we expect it to be, updating Deployment Got: %q Expect: %q", expected.Spec.Template.Spec.Containers[0].Image, d.Spec.Template.Spec.Containers[0].Image)
existing.Image = expectedContainer.Image
needsUpdate = true
}

if *d.Spec.Replicas == 0 {
logging.FromContext(ctx).Infof("Dispatcher deployment has 0 replicas. Scaling up deployment to 1 replicas")
d.Spec.Replicas = pointer.Int32Ptr(1)
needsUpdate = true
}

if needsUpdate {
d, err := r.KubeClientSet.AppsV1().Deployments(dispatcherNamespace).Update(d)
if err == nil {
controller.GetEventRecorder(ctx).Event(kc, corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated")
kc.Status.PropagateDispatcherStatus(&d.Status)
return d, nil
} else {
kc.Status.MarkServiceFailed("DispatcherDeploymentUpdateFailed", "Failed to update the dispatcher deployment: %v", err)
return d, newDeploymentWarn(err)
}
}
return d, newDeploymentWarn(err)
}

kc.Status.PropagateDispatcherStatus(&d.Status)
Expand Down
142 changes: 139 additions & 3 deletions kafka/channel/pkg/reconciler/controller/kafkachannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) {
Name: "Works, topic already exists",
Key: kcKey,
Objects: []runtime.Object{
makeDeploymentWithImage("differentimage"),
makeDeploymentWithImageAndReplicas("differentimage", 1),
makeService(),
makeReadyEndpoints(),
reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
Expand Down Expand Up @@ -476,6 +476,141 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) {
}, zap.L()))
}

func TestDeploymentZeroReplicas(t *testing.T) {
kcKey := testNS + "/" + kcName
row := TableRow{
Name: "Works, topic already exists",
Key: kcKey,
Objects: []runtime.Object{
makeDeploymentWithImageAndReplicas(testDispatcherImage, 0),
makeService(),
makeReadyEndpoints(),
reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName)),
},
WantErr: false,
WantCreates: []runtime.Object{
makeChannelService(reconcilekafkatesting.NewKafkaChannel(kcName, testNS)),
},
WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: makeDeployment(),
}},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithInitKafkaChannelConditions,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName),
reconcilekafkatesting.WithKafkaChannelConfigReady(),
reconcilekafkatesting.WithKafkaChannelTopicReady(),
// reconcilekafkatesting.WithKafkaChannelDeploymentReady(),
reconcilekafkatesting.WithKafkaChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelEndpointsReady(),
reconcilekafkatesting.WithKafkaChannelChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelAddress(channelServiceAddress),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, dispatcherDeploymentUpdated, "Dispatcher deployment updated"),
Eventf(corev1.EventTypeNormal, "KafkaChannelReconciled", `KafkaChannel reconciled: "test-namespace/test-kc"`),
},
}

row.Test(t, reconcilertesting.MakeFactory(func(ctx context.Context, listers *reconcilekafkatesting.Listers, cmw configmap.Watcher) controller.Reconciler {

r := &Reconciler{
systemNamespace: testNS,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
kafkachannelLister: listers.GetKafkaChannelLister(),
// TODO fix
kafkachannelInformer: nil,
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
endpointsLister: listers.GetEndpointsLister(),
kafkaClusterAdmin: &mockClusterAdmin{
mockCreateTopicFunc: func(topic string, detail *sarama.TopicDetail, validateOnly bool) error {
errMsg := sarama.ErrTopicAlreadyExists.Error()
return &sarama.TopicError{
Err: sarama.ErrTopicAlreadyExists,
ErrMsg: &errMsg,
}
},
},
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
}

func TestDeploymentMoreThanOneReplicas(t *testing.T) {
kcKey := testNS + "/" + kcName
row := TableRow{
Name: "Works, topic already exists",
Key: kcKey,
Objects: []runtime.Object{
makeDeploymentWithImageAndReplicas(testDispatcherImage, 3),
makeService(),
makeReadyEndpoints(),
reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName)),
},
WantErr: false,
WantCreates: []runtime.Object{
makeChannelService(reconcilekafkatesting.NewKafkaChannel(kcName, testNS)),
},
WantUpdates: []clientgotesting.UpdateActionImpl{},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconcilekafkatesting.NewKafkaChannel(kcName, testNS,
reconcilekafkatesting.WithInitKafkaChannelConditions,
reconcilekafkatesting.WithKafkaFinalizer(finalizerName),
reconcilekafkatesting.WithKafkaChannelConfigReady(),
reconcilekafkatesting.WithKafkaChannelTopicReady(),
// reconcilekafkatesting.WithKafkaChannelDeploymentReady(),
reconcilekafkatesting.WithKafkaChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelEndpointsReady(),
reconcilekafkatesting.WithKafkaChannelChannelServiceReady(),
reconcilekafkatesting.WithKafkaChannelAddress(channelServiceAddress),
),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "KafkaChannelReconciled", `KafkaChannel reconciled: "test-namespace/test-kc"`),
},
}

row.Test(t, reconcilertesting.MakeFactory(func(ctx context.Context, listers *reconcilekafkatesting.Listers, cmw configmap.Watcher) controller.Reconciler {

r := &Reconciler{
systemNamespace: testNS,
dispatcherImage: testDispatcherImage,
kafkaConfig: &KafkaConfig{
Brokers: []string{brokerName},
},
kafkachannelLister: listers.GetKafkaChannelLister(),
// TODO fix
kafkachannelInformer: nil,
deploymentLister: listers.GetDeploymentLister(),
serviceLister: listers.GetServiceLister(),
endpointsLister: listers.GetEndpointsLister(),
kafkaClusterAdmin: &mockClusterAdmin{
mockCreateTopicFunc: func(topic string, detail *sarama.TopicDetail, validateOnly bool) error {
errMsg := sarama.ErrTopicAlreadyExists.Error()
return &sarama.TopicError{
Err: sarama.ErrTopicAlreadyExists,
ErrMsg: &errMsg,
}
},
},
kafkaClientSet: fakekafkaclient.Get(ctx),
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingClient.Get(ctx),
}
return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r)
}, zap.L()))
}

type mockClusterAdmin struct {
mockCreateTopicFunc func(topic string, detail *sarama.TopicDetail, validateOnly bool) error
mockDeleteTopicFunc func(topic string) error
Expand Down Expand Up @@ -570,15 +705,16 @@ func (ca *mockClusterAdmin) DeleteConsumerGroup(group string) error {

var _ sarama.ClusterAdmin = (*mockClusterAdmin)(nil)

func makeDeploymentWithImage(image string) *appsv1.Deployment {
func makeDeploymentWithImageAndReplicas(image string, replicas int32) *appsv1.Deployment {
return resources.MakeDispatcher(resources.DispatcherArgs{
DispatcherNamespace: testNS,
Image: image,
Replicas: replicas,
})
}

func makeDeployment() *appsv1.Deployment {
return makeDeploymentWithImage(testDispatcherImage)
return makeDeploymentWithImageAndReplicas(testDispatcherImage, 1)
}

func makeReadyDeployment() *appsv1.Deployment {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"knative.dev/pkg/system"
)

const DispatcherContainerName = "dispatcher"

var (
serviceAccountName = "kafka-ch-dispatcher"
dispatcherName = "kafka-ch-dispatcher"
Expand All @@ -37,11 +39,12 @@ type DispatcherArgs struct {
DispatcherScope string
DispatcherNamespace string
Image string
Replicas int32
}

// MakeDispatcher generates the dispatcher deployment for the KafKa channel
func MakeDispatcher(args DispatcherArgs) *v1.Deployment {
replicas := int32(1)
replicas := args.Replicas

return &v1.Deployment{
TypeMeta: metav1.TypeMeta{
Expand All @@ -65,7 +68,7 @@ func MakeDispatcher(args DispatcherArgs) *v1.Deployment {
ServiceAccountName: serviceAccountName,
Containers: []corev1.Container{
{
Name: "dispatcher",
Name: DispatcherContainerName,
Image: args.Image,
Env: makeEnv(args),
Ports: []corev1.ContainerPort{{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestNewDispatcher(t *testing.T) {
DispatcherScope: "cluster",
DispatcherNamespace: testNS,
Image: imageName,
Replicas: 1,
}

replicas := int32(1)
Expand Down Expand Up @@ -120,6 +121,7 @@ func TestNewNamespaceDispatcher(t *testing.T) {
DispatcherScope: "namespace",
DispatcherNamespace: testNS,
Image: imageName,
Replicas: 1,
}

replicas := int32(1)
Expand Down
Loading

0 comments on commit 1fcaa7b

Please sign in to comment.