Skip to content

Commit

Permalink
Add persistent cache store backed by ConfigMap. Refactor mtping (#3451)
Browse files Browse the repository at this point in the history
* add persistent store backed by configmap

* adapter support watching configmap. Disabled by default

* cleanup, add more tests

* fix ut, and maybe it

* Setting up informers in now an opt-in flag, for backward compatibility reason

* more tests

* changes from review and fix configmap label

* filter resource-scoped pingsources (yeah still there)

* really delete schedule

* fix crash when configmap does not exist

* add more tests

* fix broken k8s event
  • Loading branch information
lionelvillard committed Jul 17, 2020
1 parent 6caed6a commit 1cbaa24
Show file tree
Hide file tree
Showing 22 changed files with 1,306 additions and 563 deletions.
19 changes: 2 additions & 17 deletions cmd/apiserver_receive_adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@ limitations under the License.
package main

import (
"context"
"fmt"
"knative.dev/pkg/signals"

"knative.dev/eventing/pkg/adapter/apiserver"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"
)

const (
Expand All @@ -34,16 +29,6 @@ const (

func main() {
ctx := signals.NewContext()
cfg := sharedmain.ParseAndGetConfigOrDie()
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

// Start the injection clients and informers.
go func(ctx context.Context) {
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
panic(fmt.Sprintf("Failed to start informers - %s", err))
}
<-ctx.Done()
}(ctx)

ctx = adapter.WithInjectorEnabled(ctx)
adapter.MainWithContext(ctx, component, apiserver.NewEnvConfig, apiserver.NewAdapter)
}
25 changes: 7 additions & 18 deletions cmd/mtping/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,19 @@ limitations under the License.
package main

import (
"context"
"fmt"

"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"

"knative.dev/eventing/pkg/adapter/mtping"
"knative.dev/eventing/pkg/adapter/v2"
)

const (
component = "pingsource-mt-adapter"
)

func main() {
ctx := signals.NewContext()
cfg := sharedmain.ParseAndGetConfigOrDie()
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

// Start the injection clients and informers.
go func(ctx context.Context) {
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
panic(fmt.Sprintf("Failed to start informers - %s", err))
}
<-ctx.Done()
}(ctx)

adapter.MainWithContext(ctx, "pingsource-mt-adapter", mtping.NewEnvConfig, mtping.NewAdapter)
ctx = adapter.WithConfigMapWatcherEnabled(ctx)
ctx = adapter.WithInjectorEnabled(ctx)
adapter.MainWithContext(ctx, component, mtping.NewEnvConfig, mtping.NewAdapter)
}
1 change: 1 addition & 0 deletions config/core/roles/controller-clusterroles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rules:
- "endpoints"
- "events"
- "serviceaccounts"
- "pods"
verbs: &everything
- "get"
- "list"
Expand Down
16 changes: 0 additions & 16 deletions config/core/roles/pingsource-mt-adapter-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,6 @@ rules:
- "get"
- "list"
- "watch"
- apiGroups:
- sources.knative.dev
resources:
- pingsources
- pingsources/status
verbs:
- get
- list
- watch
- patch
- apiGroups:
- sources.knative.dev
resources:
- pingsources/finalizers
verbs:
- "patch"
- apiGroups:
- ""
resources:
Expand Down
24 changes: 12 additions & 12 deletions pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
"knative.dev/pkg/controller"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/adapter/v2"
Expand All @@ -30,32 +30,32 @@ import (
// mtpingAdapter implements the PingSource mt adapter to sinks
type mtpingAdapter struct {
logger *zap.SugaredLogger
client cloudevents.Client
runner *cronJobsRunner
}

func NewEnvConfig() adapter.EnvConfigAccessor {
return &adapter.EnvConfig{}
}

func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
runner := NewCronJobsRunner(ceClient, kubeclient.Get(ctx), logging.FromContext(ctx))

cmw := adapter.ConfigMapWatcherFromContext(ctx)
cmw.Watch("config-pingsource-mt-adapter", runner.updateFromConfigMap)

return &mtpingAdapter{
logger: logging.FromContext(ctx),
client: ceClient,
runner: runner,
}
}

// Start implements adapter.Adapter
func (a *mtpingAdapter) Start(ctx context.Context) error {
runner := NewCronJobsRunner(a.client, a.logger)

ctrl := NewController(ctx, runner)

a.logger.Info("Starting controllers...")
go controller.StartAll(ctx, ctrl)

a.logger.Info("Starting job runner...")
runner.Start(ctx.Done())
if err := a.runner.Start(ctx.Done()); err != nil {
return err
}

a.logger.Infof("controller and runner stopped")
a.logger.Infof("runner stopped")
return nil
}
11 changes: 9 additions & 2 deletions pkg/adapter/mtping/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,24 @@ import (
"testing"
"time"

adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
_ "knative.dev/pkg/client/injection/kube/client/fake"
rectesting "knative.dev/pkg/reconciler/testing"

pkgadapter "knative.dev/eventing/pkg/adapter/v2"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
)

func TestStartStopAdapter(t *testing.T) {
ctx, _ := rectesting.SetupFakeContext(t)
ctx, cancel := context.WithCancel(ctx)
cmw := pkgadapter.SetupConfigMapWatchOrDie(ctx, "component", "test-ns")
ctx = pkgadapter.WithConfigMapWatcher(ctx, cmw)

envCfg := NewEnvConfig()

ce := adaptertest.NewTestClient()
adapter := NewAdapter(ctx, envCfg, ce)

ctx, cancel := context.WithCancel(ctx)
done := make(chan bool)
go func(ctx context.Context) {
err := adapter.Start(ctx)
Expand Down
74 changes: 74 additions & 0 deletions pkg/adapter/mtping/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mtping

import (
corev1 "k8s.io/api/core/v1"

"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/sources/v1alpha2"
)

type PingConfig struct {
corev1.ObjectReference `json:",inline"`

// Schedule is the cronjob schedule. Defaults to `* * * * *`.
Schedule string `json:"schedule"`

// JsonData is json encoded data used as the body of the event posted to
// the sink. Default is empty. If set, datacontenttype will also be set
// to "application/json".
// +optional
JsonData string `json:"jsonData,omitempty"`

// Extensions specify what attribute are added or overridden on the
// outbound event. Each `Extensions` key-value pair are set on the event as
// an attribute extension independently.
// +optional
Extensions map[string]string `json:"extensions,omitempty"`

// SinkURI is the current active sink URI that has been configured for the
// Source.
SinkURI string `json:"sinkUri,omitempty"`
}

type PingConfigs map[string]PingConfig

// Project creates a PingConfig for the given source
func Project(i interface{}) interface{} {
obj := i.(*v1alpha2.PingSource)

if scope, ok := obj.Annotations[eventing.ScopeAnnotationKey]; ok && scope != eventing.ScopeCluster {
return nil
}

cfg := &PingConfig{
ObjectReference: corev1.ObjectReference{
Name: obj.Name,
Namespace: obj.Namespace,
UID: obj.UID,
ResourceVersion: obj.ResourceVersion,
},
Schedule: obj.Spec.Schedule,
JsonData: obj.Spec.JsonData,
SinkURI: obj.Status.SinkURI.String(),
}
if obj.Spec.CloudEventOverrides != nil {
cfg.Extensions = obj.Spec.CloudEventOverrides.Extensions
}
return cfg
}
107 changes: 107 additions & 0 deletions pkg/adapter/mtping/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mtping

import (
"testing"

"knative.dev/pkg/apis"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

sourcesv1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2"
)

func TestProject(t *testing.T) {
testCases := map[string]struct {
source sourcesv1alpha2.PingSource
expected PingConfig
}{
"TestAddRunRemoveSchedule": {
source: sourcesv1alpha2.PingSource{
ObjectMeta: metav1.ObjectMeta{
Name: "test-name",
Namespace: "test-ns"},

Spec: sourcesv1alpha2.PingSourceSpec{
SourceSpec: duckv1.SourceSpec{
CloudEventOverrides: nil,
},
Schedule: "* * * * ?",
JsonData: "some data",
},
Status: sourcesv1alpha2.PingSourceStatus{
duckv1.SourceStatus{
SinkURI: &apis.URL{
Host: "asink",
},
},
},
},
expected: PingConfig{
ObjectReference: corev1.ObjectReference{
Name: "test-name",
Namespace: "test-ns",
},
Schedule: "* * * * ?",
JsonData: "some data",
SinkURI: "//asink",
}},
"TestAddRunRemoveScheduleWithExtensionOverride": {
source: sourcesv1alpha2.PingSource{
ObjectMeta: metav1.ObjectMeta{
Name: "test-name",
Namespace: "test-ns"},
Spec: sourcesv1alpha2.PingSourceSpec{
SourceSpec: duckv1.SourceSpec{
Sink: duckv1.Destination{},
CloudEventOverrides: &duckv1.CloudEventOverrides{
Extensions: map[string]string{"1": "one", "2": "two"},
},
},
Schedule: "* * * * ?",
JsonData: "some data",
},
Status: sourcesv1alpha2.PingSourceStatus{
duckv1.SourceStatus{
SinkURI: &apis.URL{Host: "anothersink"},
},
},
},
expected: PingConfig{
ObjectReference: corev1.ObjectReference{
Name: "test-name",
Namespace: "test-ns",
},
Schedule: "* * * * ?",
JsonData: "some data",
Extensions: map[string]string{"1": "one", "2": "two"},
SinkURI: "//anothersink",
}},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
got := Project(&tc.source)
if diff := cmp.Diff(&tc.expected, got); diff != "" {
t.Errorf("unexpected projection (-want, +got) = %v", diff)
}
})
}
}
Loading

0 comments on commit 1cbaa24

Please sign in to comment.