Skip to content

Commit

Permalink
Extract reuseable part of sink flag
Browse files Browse the repository at this point in the history
  • Loading branch information
cardil committed Sep 16, 2024
1 parent fd0126d commit 0c0828b
Show file tree
Hide file tree
Showing 8 changed files with 561 additions and 211 deletions.
2 changes: 1 addition & 1 deletion docs/cmd/kn_source_ping_create.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ kn source ping create NAME --sink SINK
```
# Create a Ping source 'my-ping' which fires every two minutes and sends '{ value: "hello" }' to service 'mysvc' as a cloudevent
kn source ping create my-ping --schedule "*/2 * * * *" --data '{ value: "hello" }' --sink ksvc:mysvc
kn source ping create my-ping --schedule "*/2 * * * *" --data '{ value: "hello" }' --sink mysvc
```

### Options
Expand Down
191 changes: 58 additions & 133 deletions pkg/commands/flags/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ package flags

import (
"context"
"fmt"
"strings"

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/client/pkg/config"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

clientdynamic "knative.dev/client/pkg/dynamic"
"knative.dev/client/pkg/flags/sink"
"knative.dev/client/pkg/util/errors"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

// SinkFlags holds information about given sink together with optional mappings
// to allow ease of referencing the common types.
type SinkFlags struct {
Sink string
SinkMappings map[string]schema.GroupVersionResource
Expand All @@ -42,156 +42,81 @@ func NewSinkFlag(mapping map[string]schema.GroupVersionResource) *SinkFlags {
}

// AddWithFlagName configures Sink flag with given flag name and a short flag name
// pass empty short flag name if you don't want to set one
// pass empty short flag name if you don't want to set one.
func (i *SinkFlags) AddWithFlagName(cmd *cobra.Command, fname, short string) {
flag := "--" + fname
i.AddToFlagSet(cmd.Flags(), fname, short)
}

// AddToFlagSet configures Sink flag with given flag name and a short flag name
// pass empty short flag name if you don't want to set one
func (i *SinkFlags) AddToFlagSet(fs *pflag.FlagSet, fname, short string) {
if short == "" {
cmd.Flags().StringVar(&i.Sink, fname, "", "")
fs.StringVar(&i.Sink, fname, "", "")
} else {
cmd.Flags().StringVarP(&i.Sink, fname, short, "", "")
fs.StringVarP(&i.Sink, fname, short, "", "")
}
cmd.Flag(fname).Usage = "Addressable sink for events. " +
"You can specify a broker, channel, Knative service or URI. " +
"Examples: '" + flag + " broker:nest' for a broker 'nest', " +
"'" + flag + " channel:pipe' for a channel 'pipe', " +
"'" + flag + " ksvc:mysvc:mynamespace' for a Knative service 'mysvc' in another namespace 'mynamespace', " +
"'" + flag + " https://event.receiver.uri' for an HTTP URI, " +
"'" + flag + " ksvc:receiver' or simply '" + flag + " receiver' for a Knative service 'receiver' in the current namespace. " +
"'" + flag + " special.eventing.dev/v1alpha1/channels:pipe' for GroupVersionResource of v1alpha1 'pipe'. " +
"If a prefix is not provided, it is considered as a Knative service in the current namespace."
// Use default mapping if empty
if i.SinkMappings == nil {
i.SinkMappings = defaultSinkMappings
fs.Lookup(fname).Usage = sink.Usage(fname)
}

// Add configures Sink flag with name 'Sink' amd short name 's'
func (i *SinkFlags) Add(cmd *cobra.Command) {
i.AddWithFlagName(cmd, sink.DefaultFlagName, sink.DefaultFlagShorthand)
}

// WithDefaultMappings will return a copy of SinkFlags with provided mappings
// and the default ones.
func (i *SinkFlags) WithDefaultMappings() *SinkFlags {
sf := &SinkFlags{
Sink: i.Sink,
SinkMappings: make(map[string]schema.GroupVersionResource,
len(i.SinkMappings)+len(sink.DefaultMappings)),
}
for k, v := range sink.DefaultMappings {
sf.SinkMappings[k] = v
}
for k, v := range i.SinkMappings {
sf.SinkMappings[k] = v
}
for _, p := range config.GlobalConfig.SinkMappings() {
//user configuration might override the default configuration
i.SinkMappings[p.Prefix] = schema.GroupVersionResource{
// user configuration might override the default configuration
sf.SinkMappings[p.Prefix] = schema.GroupVersionResource{
Resource: p.Resource,
Group: p.Group,
Version: p.Version,
}
}
return sf
}

// Add configures Sink flag with name 'Sink' amd short name 's'
func (i *SinkFlags) Add(cmd *cobra.Command) {
i.AddWithFlagName(cmd, "sink", "s")
}

// SinkPrefixes maps prefixes used for sinks to their GroupVersionResources.
var defaultSinkMappings = map[string]schema.GroupVersionResource{
"broker": {
Resource: "brokers",
Group: "eventing.knative.dev",
Version: "v1",
},
// Shorthand alias for service
"ksvc": {
Resource: "services",
Group: "serving.knative.dev",
Version: "v1",
},
"channel": {
Resource: "channels",
Group: "messaging.knative.dev",
Version: "v1",
},
// Parse returns the sink reference, which may refer to URL or to Kubernetes
// resource. The namespace given should be the current namespace within the
// context.
func (i *SinkFlags) Parse(namespace string) (*sink.Reference, error) {
// Use default mapping if empty
sf := i.WithDefaultMappings()
return sink.Parse(sf.Sink, namespace, sf.SinkMappings)
}

// ResolveSink returns the Destination referred to by the flags in the acceptor.
// It validates that any object the user is referring to exists.
func (i *SinkFlags) ResolveSink(ctx context.Context, knclient clientdynamic.KnDynamicClient, namespace string) (*duckv1.Destination, error) {
client := knclient.RawClient()
if i.Sink == "" {
return nil, nil
}
// Use default mapping if empty
if i.SinkMappings == nil {
i.SinkMappings = defaultSinkMappings
}
prefix, name, ns := parseSink(i.Sink)
if prefix == "" {
// URI target
uri, err := apis.ParseURL(name)
if err != nil {
return nil, err
}
return &duckv1.Destination{URI: uri}, nil
}
gvr, ok := i.SinkMappings[prefix]
if !ok {
if prefix == "svc" || prefix == "service" {
return nil, fmt.Errorf("unsupported Sink prefix: '%s', please use prefix 'ksvc' for knative service", prefix)
}
idx := strings.LastIndex(prefix, "/")
var groupVersion string
var kind string
if idx != -1 && idx < len(prefix)-1 {
groupVersion, kind = prefix[:idx], prefix[idx+1:]
} else {
kind = prefix
}
parsedVersion, err := schema.ParseGroupVersion(groupVersion)
if err != nil {
return nil, err
}

// For the RAWclient the resource name must be in lower case plural form.
// This is the best effort to sanitize the inputs, but the safest way is to provide
// the appropriate form in user's input.
if !strings.HasSuffix(kind, "s") {
kind = kind + "s"
}
kind = strings.ToLower(kind)
gvr = parsedVersion.WithResource(kind)
}
if ns != "" {
namespace = ns
}
obj, err := client.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
s, err := i.Parse(namespace)
if err != nil {
return nil, err
}

destination := &duckv1.Destination{
Ref: &duckv1.KReference{
Kind: obj.GetKind(),
APIVersion: obj.GetAPIVersion(),
Name: obj.GetName(),
Namespace: namespace,
},
}
return destination, nil
}

// parseSink takes the string given by the user into the prefix, name and namespace of
// the object. If the user put a URI instead, the prefix is empty and the name
// is the whole URI.
func parseSink(sink string) (string, string, string) {
parts := strings.SplitN(sink, ":", 3)
switch {
case len(parts) == 1:
return "ksvc", parts[0], ""
case parts[0] == "http" || parts[0] == "https":
return "", sink, ""
case len(parts) == 3:
return parts[0], parts[1], parts[2]
default:
return parts[0], parts[1], ""
var dest *duckv1.Destination
dest, err = s.Resolve(ctx, knclient)
if err != nil {
// Returning original error that caused sink.ErrSinkIsInvalid as it is
// directly presented to the end-user.
return nil, errors.CauseOf(err, sink.ErrSinkIsInvalid)
}
return dest, nil
}

// SinkToString prepares a Sink for list output
func SinkToString(sink duckv1.Destination) string {
if sink.Ref != nil {
if sink.Ref.Kind == "Service" && strings.HasPrefix(sink.Ref.APIVersion, defaultSinkMappings["ksvc"].Group) {
return fmt.Sprintf("ksvc:%s", sink.Ref.Name)
} else {
return fmt.Sprintf("%s:%s", strings.ToLower(sink.Ref.Kind), sink.Ref.Name)
}
}
if sink.URI != nil {
return sink.URI.String()
}
return ""
// Deprecated: use (*sink.Reference).AsText instead.
func SinkToString(dest duckv1.Destination) string {
ref := sink.GuessFromDestination(dest)
return ref.String()
}
Loading

0 comments on commit 0c0828b

Please sign in to comment.