Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

delete ra deployment when sink is not found #1533

Merged
merged 13 commits into from
Sep 11, 2020
12 changes: 12 additions & 0 deletions kafka/source/pkg/reconciler/source/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/reconciler/source"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

Expand Down Expand Up @@ -112,6 +113,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1beta1.KafkaSource
sinkURI, err := r.sinkResolver.URIFromDestinationV1(*dest, src)
if err != nil {
src.Status.MarkNoSink("NotFound", "")
//delete adapter deployment if sink not found
if err := r.deleteReceiveAdapter(ctx, src); err != nil && !apierrors.IsNotFound(err) {
logging.FromContext(ctx).Error("Unable to delete receiver adapter when sink is missing", zap.Error(err))
}
return fmt.Errorf("getting sink URI: %v", err)
}
src.Status.MarkSink(sinkURI)
Expand Down Expand Up @@ -186,6 +191,13 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1beta1.Kafk
return ra, nil
}

//deleteReceiveAdapter deletes the reciever adapter deployment if any
func (r *Reconciler) deleteReceiveAdapter(ctx context.Context, src *v1beta1.KafkaSource) error {
name := utils.GenerateFixedName(src, fmt.Sprintf("kafkasource-%s", src.Name))

return r.KubeClientSet.AppsV1().Deployments(src.Namespace).Delete(name, &metav1.DeleteOptions{})
}

func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool {
if !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) {
return true
Expand Down
3 changes: 3 additions & 0 deletions test/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fi
# Eventing main config path from HEAD.
readonly EVENTING_CONFIG="./config/"
readonly EVENTING_MT_CHANNEL_BROKER_CONFIG="./config/brokers/mt-channel-broker"
readonly EVENTING_IN_MEMORY_CHANNEL_CONFIG="./config/channels/in-memory-channel"

# Vendored eventing test iamges.
readonly VENDOR_EVENTING_TEST_IMAGES="vendor/knative.dev/eventing/test/test_images/"
Expand Down Expand Up @@ -89,6 +90,8 @@ function knative_setup() {
ko apply -f ${EVENTING_CONFIG}
# Install MT Channel Based Broker
ko apply -f ${EVENTING_MT_CHANNEL_BROKER_CONFIG}
# Install IMC
ko apply -f ${EVENTING_IN_MEMORY_CHANNEL_CONFIG}
popd
fi
wait_until_pods_running knative-eventing || fail_test "Knative Eventing did not come up"
Expand Down
52 changes: 52 additions & 0 deletions test/e2e/helpers/kafka_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/google/uuid"

"github.com/davecgh/go-spew/spew"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -33,16 +34,22 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
testlib "knative.dev/eventing/test/lib"
pkgtest "knative.dev/pkg/test"

sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
kafkaclientset "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned"
)

const (
strimziApiGroup = "kafka.strimzi.io"
strimziApiVersion = "v1beta1"
strimziTopicResource = "kafkatopics"
interval = 3 * time.Second
timeout = 30 * time.Second
)

var (
topicGVR = schema.GroupVersionResource{Group: strimziApiGroup, Version: strimziApiVersion, Resource: strimziTopicResource}
ImcGVR = schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1beta1", Resource: "inmemorychannels"}
)

func MustPublishKafkaMessage(client *testlib.Client, bootstrapServer string, topic string, key string, headers map[string]string, value string) {
Expand Down Expand Up @@ -232,3 +239,48 @@ func MustCreateTopic(client *testlib.Client, clusterName string, clusterNamespac

client.Tracker.Add(topicGVR.Group, topicGVR.Version, topicGVR.Resource, clusterNamespace, topicName)
}

//CheckKafkaSourceState waits for specified kafka source resource state
//On timeout reports error
func CheckKafkaSourceState(c *testlib.Client, name string, inState func(ks *sourcesv1beta1.KafkaSource) (bool, error)) error {
kafkaSourceClientSet, err := kafkaclientset.NewForConfig(c.Config)
if err != nil {
return err
}
kSources := kafkaSourceClientSet.SourcesV1beta1().KafkaSources(c.Namespace)
var lastState *sourcesv1beta1.KafkaSource
waitErr := wait.PollImmediate(interval, timeout, func() (bool, error) {
var err error
lastState, err = kSources.Get(name, metav1.GetOptions{})
if err != nil {
return true, err
}
return inState(lastState)
})
if waitErr != nil {
return fmt.Errorf("kafkasource %q is not in desired state, got: %+v: %w", name, lastState, waitErr)
}
return nil
}

//CheckRADeployment waits for desired state of receiver adapter
//On timeout reports error
func CheckRADeployment(c *testlib.Client, name string, inState func(deps *appsv1.DeploymentList) (bool, error)) error {
listOptions := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", "eventing.knative.dev/SourceName", name),
}
kDeps := c.Kube.Kube.AppsV1().Deployments(c.Namespace)
var lastState *appsv1.DeploymentList
waitErr := wait.PollImmediate(interval, timeout, func() (bool, error) {
var err error
lastState, err = kDeps.List(listOptions)
if err != nil {
return true, err
}
return inState(lastState)
})
if waitErr != nil {
return fmt.Errorf("receiver adapter deployments %q is not in desired state, got: %+v: %w", name, lastState, waitErr)
}
return nil
}
131 changes: 131 additions & 0 deletions test/e2e/kafka_source_reconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//+build e2e

/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"testing"

appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"
"knative.dev/pkg/apis"
pkgTest "knative.dev/pkg/test"

sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
"knative.dev/eventing-contrib/test/e2e/helpers"
contribtestlib "knative.dev/eventing-contrib/test/lib"
contribresources "knative.dev/eventing-contrib/test/lib/resources"
)

const (
rtKafkaSourceName = "e2e-rt-kafka-source"
rtChannelName = "e2e-rt-channel"
rtKafkaConsumerGroup = "e2e-rt-cg"
rtKafkaTopicName = "e2e-rt-topic"
)

//TestKafkaSourceReconciler tests various kafka source reconciler statuses
//RT is short for reconciler test
func TestKafkaSourceReconciler(t *testing.T) {
client := testlib.Setup(t, true)
defer testlib.TearDown(client)

for _, test := range []struct {
name string
action func(c *testlib.Client)
expectedStatuses sets.String
wantRADepCount int
}{{
"create_kafka_source",
createKafkaSourceWithSinkMissing,
sets.NewString("NotFound"),
0,
}, {
"create_sink",
createChannel,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why you're using the kafka channel here 😄, if you need to create a "sink", can you use a simpler one (like the test images to receive events)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to use the simplest one like you have mentioned, but if am not wrong, if the sink is "v1/Service", it is not validated to see if its available or not. Reconciles to true always. Next option was to use ksvc, but i dint see any ksvc used as sink in any of the eventing tests nor the knative serving components getting installed. Thats why used a channel. Willing to change to simpler ones if there are other options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you need an addressable, so maybe using a kube service with a pod behind the hood is fine? I would love as much as possible to have this "dependency" between the kafka source tests and kafka channel

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but with kube service as i mentioned above , its not reporting the right status on the kafks source. It always reconciles to true whether its there are not. I initially tried with kube service only.

Copy link
Contributor

@slinkydeveloper slinkydeveloper Sep 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so then, more than creating a kafka channel, can you create an imc channel and clearly state you need it for that purpose? Because IMC is part of the eventing "core release", it's installed in the test env for sure. You cannot make the same assumption with kafka channel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the sink is "v1/Service", it is not validated to see if its available or not.

Oh, ok, this is the reason that I wasn't able to reproduce the issue. I was trying with a Kube service

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used imc initially, the tests failed, then switched to kafka channel. Tried one more time with imc, got the below logs in tests

=== RUN   TestKafkaSourceReconciler/create_sink
=== CONT  TestKafkaSourceReconciler
    creation.go:77: Creating channel &TypeMeta{Kind:InMemoryChannel,APIVersion:messaging.knative.dev/v1beta1,}-e2e-rt-channel
    creation.go:85: Failed to create "InMemoryChannel" "e2e-rt-channel": the server could not find the requested resource
    creation.go:90: Failed to create "InMemoryChannel" "e2e-rt-channel": the server could not find the requested resource
=== CONT  TestKafkaSourceReconciler/create_sink
    testing.go:964: test executed panic(nil) or runtime.Goexit: subtest may have called FailNow on a parent test
=== CONT  TestKafkaSourceReconciler
    test_runner.go:195: EVENT: {{ } {e2e-rt-kafka-source.163333e31993bfcc  test-kafka-source-reconciler-wbvf2 /api/v1/namespaces/test-kafka-source-reconciler-wbvf2/events/e2e-rt-kafka-source.163333e31993bfcc 278249f1-af24-4582-93f9-4811b4e4e239 1172 0 2020-09-09 19:22:23 +0000 UTC <nil> <nil> map[] map[] [] []  []} {KafkaSource test-kafka-source-reconciler-wbvf2 e2e-rt-kafka-source be8ceb0b-957b-4e18-813c-5c40244dbdd3 sources.knative.dev/v1beta1 19196 } InternalError getting sink URI: failed to get lister for messaging.knative.dev/v1beta1, Resource=inmemorychannels: inmemorychannels.messaging.knative.dev is forbidden: User "system:serviceaccount:knative-sources:kafka-controller-manager" cannot list resource "inmemorychannels" in API group "messaging.knative.dev" at the cluster scope {kafkasource-controller } 2020-09-09 19:22:23 +0000 UTC 2020-09-09 19:22:25 +0000 UTC 6 Warning 0001-01-01 00:00:00 +0000 UTC nil  nil 

Copy link
Contributor

@slinkydeveloper slinkydeveloper Sep 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first 2 errors (failed to create) are weird, i don't get what's the problem there tbh, double check all the code you use to start the imc. in fact, in testlib.Client there should be a method to create a imc channel (it's from eventing/test/lib), you don't need to develop it by yourself.

The last error is clearly an issue of cluster roles, seems like you need some specific cluster role to check if the addressable is ready. @matzew any thoughts on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imc was not set up as part of e2e set up, added those in the e2e scripts and test passes. @slinkydeveloper @matzew

sets.NewString(""),
1,
}, {
"delete_sink",
deleteChannel,
sets.NewString("NotFound"),
0,
}, {
"create_sink_after_delete",
createChannel,
sets.NewString(""),
1,
}} {
t.Run(test.name, func(t *testing.T) {
testKafkaSourceReconciler(client, test.name, test.action, test.expectedStatuses, test.wantRADepCount)
})
}
}

func testKafkaSourceReconciler(c *testlib.Client, name string, doAction func(c *testlib.Client), expectedStatuses sets.String, wantRADepCount int) {
doAction(c)

if err := helpers.CheckKafkaSourceState(c, rtKafkaSourceName, func(ks *sourcesv1beta1.KafkaSource) (bool, error) {
ready := ks.Status.GetCondition(apis.ConditionReady)
if ready != nil {
if expectedStatuses.Has(ready.Reason) {
return true, nil
}
}
return false, nil
}); err != nil {
c.T.Fatalf("Failed to validate kafkasource state, expected status : %v, err : %v", expectedStatuses.UnsortedList(), err)
}

if err := helpers.CheckRADeployment(c, rtKafkaSourceName, func(deps *appsv1.DeploymentList) (bool, error) {
if len(deps.Items) == wantRADepCount {
return true, nil
}
return false, nil
}); err != nil {
c.T.Fatal("Failed to validate adapter deployment state:", err)
}
}

func createKafkaSourceWithSinkMissing(c *testlib.Client) {
helpers.MustCreateTopic(c, kafkaClusterName, kafkaClusterNamespace, rtKafkaTopicName)

contribtestlib.CreateKafkaSourceV1Beta1OrFail(c, contribresources.KafkaSourceV1Beta1(
kafkaBootstrapUrl,
rtKafkaTopicName,
pkgTest.CoreV1ObjectReference(resources.InMemoryChannelKind, resources.MessagingAPIVersion, rtChannelName),
contribresources.WithNameV1Beta1(rtKafkaSourceName),
contribresources.WithConsumerGroupV1Beta1(rtKafkaConsumerGroup),
))
}

func createChannel(c *testlib.Client) {
c.CreateChannelOrFail(rtChannelName, &metav1.TypeMeta{
APIVersion: resources.MessagingAPIVersion,
Kind: resources.InMemoryChannelKind,
})
c.WaitForAllTestResourcesReadyOrFail()
}

func deleteChannel(c *testlib.Client) {
contribtestlib.DeleteResourceOrFail(c, rtChannelName, helpers.ImcGVR)
}
30 changes: 30 additions & 0 deletions test/lib/deletion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lib

import (
"k8s.io/apimachinery/pkg/runtime/schema"

testlib "knative.dev/eventing/test/lib"
)

func DeleteResourceOrFail(c *testlib.Client, name string, gvr schema.GroupVersionResource) {
unstructured := c.Dynamic.Resource(gvr).Namespace(c.Namespace)
if err := unstructured.Delete(name, nil); err != nil {
c.T.Fatalf("Failed to delete the resource %q : %v", name, err)
}
}