Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Backport usage of Destination in CamelSource (#700) to release-0.10 b…
Browse files Browse the repository at this point in the history
…ranch (#718)
  • Loading branch information
nicolaferraro authored and knative-prow-robot committed Nov 21, 2019
1 parent cfa9e6e commit 302c5e0
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 73 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 24 additions & 7 deletions camel/source/pkg/apis/sources/v1alpha1/camelsource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
duckapis "knative.dev/pkg/apis"
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
)

// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
Expand Down Expand Up @@ -51,16 +52,16 @@ var _ = duck.VerifyType(&CamelSource{}, &duckv1.Conditions{})

const (
// CamelSourceConditionReady has status True when the CamelSource is ready to send events.
CamelConditionReady = duckapis.ConditionReady
CamelConditionReady = apis.ConditionReady

// CamelConditionSinkProvided has status True when the CamelSource has been configured with a sink target.
CamelConditionSinkProvided duckapis.ConditionType = "SinkProvided"
CamelConditionSinkProvided apis.ConditionType = "SinkProvided"

// CamelConditionDeployed has status True when the CamelSource has had it's deployment created.
CamelConditionDeployed duckapis.ConditionType = "Deployed"
CamelConditionDeployed apis.ConditionType = "Deployed"
)

var camelCondSet = duckapis.NewLivingConditionSet(
var camelCondSet = apis.NewLivingConditionSet(
CamelConditionSinkProvided,
CamelConditionDeployed,
)
Expand All @@ -72,7 +73,7 @@ type CamelSourceSpec struct {

// Sink is a reference to an object that will resolve to a domain name to use as the sink.
// +optional
Sink *corev1.ObjectReference `json:"sink,omitempty"`
Sink *apisv1alpha1.Destination `json:"sink,omitempty"`

// CloudEventOverrides defines overrides to control the output format and
// modifications of the event sent to the sink.
Expand Down Expand Up @@ -113,7 +114,7 @@ type CamelSourceStatus struct {
}

// GetCondition returns the condition currently associated with the given type, or nil.
func (s *CamelSourceStatus) GetCondition(t duckapis.ConditionType) *duckapis.Condition {
func (s *CamelSourceStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return camelCondSet.Manage(s).GetCondition(t)
}

Expand All @@ -137,6 +138,22 @@ func (s *CamelSourceStatus) MarkSink(uri string) {
}
}

// MarkSinkWarnDeprecated sets the condition that the source has a sink configured and warns ref is deprecated.
func (s *CamelSourceStatus) MarkSinkWarnRefDeprecated(uri string) {
s.SinkURI = uri
if len(uri) > 0 {
c := apis.Condition{
Type: CamelConditionSinkProvided,
Status: corev1.ConditionTrue,
Severity: apis.ConditionSeverityError,
Message: "Using deprecated object ref fields when specifying spec.sink. Update to spec.sink.ref. These will be removed in a future release.",
}
camelCondSet.Manage(s).SetCondition(c)
} else {
camelCondSet.Manage(s).MarkUnknown(CamelConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.")
}
}

// MarkNoSink sets the condition that the source does not have a sink configured.
func (s *CamelSourceStatus) MarkNoSink(reason, messageFormat string, messageA ...interface{}) {
camelCondSet.Manage(s).MarkFalse(CamelConditionSinkProvided, reason, messageFormat, messageA...)
Expand Down
10 changes: 5 additions & 5 deletions camel/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 44 additions & 13 deletions camel/source/pkg/reconciler/camelsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reconciler

import (
"context"
"fmt"
"log"
"strings"

Expand All @@ -30,12 +31,15 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/record"
"knative.dev/eventing-contrib/camel/source/pkg/apis/sources/v1alpha1"
"knative.dev/eventing-contrib/camel/source/pkg/reconciler/resources"
"knative.dev/eventing-contrib/pkg/controller/sdk"
"knative.dev/eventing-contrib/pkg/controller/sinks"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -59,27 +63,36 @@ func Add(mgr manager.Manager, logger *zap.SugaredLogger) error {
mgr.GetScheme().AddKnownTypes(camelv1alpha1.SchemeGroupVersion, &camelv1alpha1.IntegrationPlatform{}, &camelv1alpha1.IntegrationPlatformList{})
metav1.AddToGroupVersion(mgr.GetScheme(), camelv1alpha1.SchemeGroupVersion)

ctx := context.Background()
dynamicClient, err := dynamic.NewForConfig(mgr.GetConfig())
if err != nil {
return err
}
ctx = context.WithValue(ctx, dynamicclient.Key{}, dynamicClient)

p := &sdk.Provider{
AgentName: controllerAgentName,
Parent: &v1alpha1.CamelSource{},
Owns: []runtime.Object{&camelv1alpha1.Integration{}},
Reconciler: &reconciler{
recorder: mgr.GetEventRecorderFor(controllerAgentName),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor(controllerAgentName),
scheme: mgr.GetScheme(),
sinkResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}),
},
}

err := p.Add(mgr, logger)
err = p.Add(mgr, logger)
if err != nil {
log.Println("Camel K cluster resources not installed correctly. Follow installation instructions at: https://github.com/apache/camel-k")
}
return err
}

type reconciler struct {
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
sinkResolver *resolver.URIResolver
}

// A CamelSource delegates the task of starting up the required containers to a Camel K Integration resource, that
Expand Down Expand Up @@ -113,12 +126,34 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) error
source.Status.ObservedGeneration = source.Generation
source.Status.InitializeConditions()

sinkURI, err := sinks.GetSinkURI(ctx, r.client, source.Spec.Sink, source.Namespace)
if source.Spec.Sink == nil {
source.Status.MarkNoSink("SinkMissing", "")
return fmt.Errorf("spec.sink missing")
}

dest := source.Spec.Sink.DeepCopy()
// fill optional data in destination
if dest.Ref != nil {
if dest.Ref.Namespace == "" {
dest.Ref.Namespace = source.GetNamespace()
}
} else if dest.DeprecatedName != "" && dest.DeprecatedNamespace == "" {
dest.DeprecatedNamespace = source.GetNamespace()
}

sinkURI, err := r.sinkResolver.URIFromDestination(*dest, source)
if err != nil {
source.Status.MarkNoSink("NotFound", "")
return err
}
source.Status.MarkSink(sinkURI)

if dest.DeprecatedAPIVersion != "" &&
dest.DeprecatedKind != "" &&
dest.DeprecatedName != "" {
source.Status.MarkSinkWarnRefDeprecated(sinkURI)
} else {
source.Status.MarkSink(sinkURI)
}

integration, err := r.reconcileIntegration(ctx, source, sinkURI)
if err != nil {
Expand All @@ -140,10 +175,6 @@ func (r *reconciler) reconcileIntegration(ctx context.Context, source *v1alpha1.
Namespace: source.Namespace,
Source: source.Spec.Source,
SinkURL: sinkURI,
SinkType: metav1.TypeMeta{
Kind: source.Spec.Sink.Kind,
APIVersion: source.Spec.Sink.APIVersion,
},
}
if source.Spec.CloudEventOverrides != nil {
args.Overrides = make(map[string]string)
Expand Down
70 changes: 38 additions & 32 deletions camel/source/pkg/reconciler/camelsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package reconciler

import (
"context"
"fmt"
"testing"

Expand All @@ -27,12 +28,17 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
sourcesv1alpha1 "knative.dev/eventing-contrib/camel/source/pkg/apis/sources/v1alpha1"
"knative.dev/eventing-contrib/camel/source/pkg/reconciler/resources"
controllertesting "knative.dev/eventing-contrib/pkg/controller/testing"
duckv1alpha1 "knative.dev/pkg/apis/duck/v1alpha1"
"knative.dev/pkg/apis/v1alpha1"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/resolver"
)

var (
Expand All @@ -53,8 +59,8 @@ const (
generation = 1

addressableName = "testsink"
addressableKind = "Sink"
addressableAPIVersion = "duck.knative.dev/v1alpha1"
addressableKind = "Service"
addressableAPIVersion = "serving.knative.dev/v1"
addressableDNS = "addressable.sink.svc.cluster.local"
addressableURI = "http://addressable.sink.svc.cluster.local"
)
Expand Down Expand Up @@ -107,7 +113,7 @@ func TestReconcile(t *testing.T) {
WantAbsent: []runtime.Object{
getContext(),
},
WantErrMsg: "sinks.duck.knative.dev \"testsink\" not found",
WantErrMsg: `failed to get ref &ObjectReference{Kind:Service,Namespace:testnamespace,Name:testsink,UID:,APIVersion:serving.knative.dev/v1,ResourceVersion:,FieldPath:,}: services.serving.knative.dev "testsink" not found`,
},
{
Name: "Creating integration",
Expand Down Expand Up @@ -269,16 +275,22 @@ func TestReconcile(t *testing.T) {
}

c := tc.GetClient()
dynClient := fake.NewSimpleDynamicClient(scheme.Scheme, tc.InitialState...)
baseContext := context.WithValue(context.Background(), dynamicclient.Key{}, dynClient)
ctx, cancel := context.WithCancel(baseContext)

r := &reconciler{
client: c,
scheme: tc.Scheme,
recorder: recorder,
client: c,
scheme: tc.Scheme,
recorder: recorder,
sinkResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}),
}
if err := r.InjectClient(c); err != nil {
t.Errorf("cannot inject client: %v", zap.Error(err))
}

t.Run(tc.Name, tc.Runner(t, r, c))
cancel()
}
}

Expand All @@ -297,10 +309,12 @@ func getSource() *sourcesv1alpha1.CamelSource {
},
},
},
Sink: &corev1.ObjectReference{
Name: addressableName,
Kind: addressableKind,
APIVersion: addressableAPIVersion,
Sink: &v1alpha1.Destination{
Ref: &corev1.ObjectReference{
Name: addressableName,
Kind: addressableKind,
APIVersion: addressableAPIVersion,
},
},
},
}
Expand Down Expand Up @@ -329,10 +343,12 @@ func getCamelKSource() *sourcesv1alpha1.CamelSource {
},
},
},
Sink: &corev1.ObjectReference{
Name: addressableName,
Kind: addressableKind,
APIVersion: addressableAPIVersion,
Sink: &v1alpha1.Destination{
Ref: &corev1.ObjectReference{
Name: addressableName,
Kind: addressableKind,
APIVersion: addressableAPIVersion,
},
},
},
}
Expand Down Expand Up @@ -364,10 +380,12 @@ func getCamelKFlowSource() *sourcesv1alpha1.CamelSource {
Source: sourcesv1alpha1.CamelSourceOriginSpec{
Flow: &flow,
},
Sink: &corev1.ObjectReference{
Name: addressableName,
Kind: addressableKind,
APIVersion: addressableAPIVersion,
Sink: &v1alpha1.Destination{
Ref: &corev1.ObjectReference{
Name: addressableName,
Kind: addressableKind,
APIVersion: addressableAPIVersion,
},
},
},
}
Expand Down Expand Up @@ -404,7 +422,7 @@ func makeContext(namespace string, image string) *camelv1alpha1.IntegrationKit {
GenerateName: "ctx-",
Namespace: namespace,
Labels: map[string]string{
"app": "camel-k",
"app": "camel-k",
"camel.apache.org/kit.type": camelv1alpha1.IntegrationKitTypeExternal,
},
},
Expand All @@ -421,10 +439,6 @@ func getRunningIntegration(t *testing.T) *camelv1alpha1.Integration {
Name: sourceName,
Namespace: testNS,
SinkURL: addressableURI,
SinkType: metav1.TypeMeta{
Kind: addressableKind,
APIVersion: addressableAPIVersion,
},
Source: sourcesv1alpha1.CamelSourceOriginSpec{
Flow: &sourcesv1alpha1.Flow{
"from": map[string]interface{}{
Expand All @@ -451,10 +465,6 @@ func getRunningCamelKIntegration(t *testing.T) *camelv1alpha1.Integration {
Name: sourceName,
Namespace: testNS,
SinkURL: addressableURI,
SinkType: metav1.TypeMeta{
Kind: addressableKind,
APIVersion: addressableAPIVersion,
},
Source: sourcesv1alpha1.CamelSourceOriginSpec{
Integration: &camelv1alpha1.IntegrationSpec{
Sources: []camelv1alpha1.SourceSpec{
Expand All @@ -481,10 +491,6 @@ func getRunningCamelKFlowIntegration(t *testing.T) *camelv1alpha1.Integration {
Name: sourceName,
Namespace: testNS,
SinkURL: addressableURI,
SinkType: metav1.TypeMeta{
Kind: addressableKind,
APIVersion: addressableAPIVersion,
},
Source: sourcesv1alpha1.CamelSourceOriginSpec{
Integration: &camelv1alpha1.IntegrationSpec{
Sources: []camelv1alpha1.SourceSpec{
Expand Down Expand Up @@ -569,7 +575,7 @@ func getAddressable() *unstructured.Unstructured {
},
"status": map[string]interface{}{
"address": map[string]interface{}{
"hostname": addressableDNS,
"url": addressableURI,
},
},
},
Expand Down
Loading

0 comments on commit 302c5e0

Please sign in to comment.