Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for service-mirror selectors #4795

Merged
merged 2 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 16 additions & 250 deletions cli/cmd/multicluster.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cmd

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -25,8 +23,6 @@ import (
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/helm/pkg/chartutil"
Expand Down Expand Up @@ -72,11 +68,7 @@ type (
logLevel string
controlPlaneVersion string
dockerRegistry string
}

exportServiceOptions struct {
gatewayNamespace string
gatewayName string
selector string
}

gatewaysOptions struct {
Expand Down Expand Up @@ -117,6 +109,7 @@ func newLinkOptionsWithDefault() (*linkOptions, error) {
dockerRegistry: defaultDockerRegistry,
serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit,
logLevel: defaults.LogLevel,
selector: k8s.DefaultExportedServiceSelector,
}, nil
}

Expand Down Expand Up @@ -517,11 +510,6 @@ func newLinkCommand() *cobra.Command {
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName),
Namespace: opts.namespace,
Annotations: map[string]string{
k8s.RemoteClusterNameLabel: opts.clusterName,
k8s.RemoteClusterDomainAnnotation: configMap.Global.ClusterDomain,
k8s.RemoteClusterLinkerdNamespaceAnnotation: controlPlaneNamespace,
},
},
Data: map[string][]byte{
k8s.ConfigKeyName: kubeconfig,
Expand Down Expand Up @@ -561,6 +549,11 @@ func newLinkCommand() *cobra.Command {
return err
}

selector, err := metav1.ParseToLabelSelector(opts.selector)
if err != nil {
return err
}

link := mc.Link{
Name: opts.clusterName,
Namespace: opts.namespace,
Expand All @@ -572,9 +565,14 @@ func newLinkCommand() *cobra.Command {
GatewayPort: gatewayPort,
GatewayIdentity: gatewayIdentity,
ProbeSpec: probeSpec,
Selector: *selector,
}

linkOut, err := yaml.Marshal(link.ToUnstructured().Object)
obj, err := link.ToUnstructured()
if err != nil {
return err
}
linkOut, err := yaml.Marshal(obj.Object)
if err != nil {
return err
}
Expand Down Expand Up @@ -630,229 +628,7 @@ func newLinkCommand() *cobra.Command {
cmd.Flags().Uint32Var(&opts.serviceMirrorRetryLimit, "service-mirror-retry-limit", opts.serviceMirrorRetryLimit, "The number of times a failed update from the target cluster is allowed to be retried")
cmd.Flags().StringVar(&opts.logLevel, "log-level", opts.logLevel, "Log level for the Multicluster components")
cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry, "Docker registry to pull service mirror controller image from")

return cmd
}

type exportReport struct {
resourceKind string
resourceName string
exported bool
}

func transform(bytes []byte, gatewayName, gatewayNamespace string) ([]byte, []*exportReport, error) {
var metaType metav1.TypeMeta

if err := yaml.Unmarshal(bytes, &metaType); err != nil {
return nil, nil, err
}

if metaType.Kind == "Service" {
var service corev1.Service
if err := yaml.Unmarshal(bytes, &service); err != nil {
return nil, nil, err
}

if service.Annotations == nil {
service.Annotations = map[string]string{}
}
report := &exportReport{
resourceKind: strings.ToLower(metaType.Kind),
resourceName: service.Name,
}

if service.Labels != nil {
if _, isMirroredResource := service.Labels[k8s.MirroredResourceLabel]; isMirroredResource {
report.exported = false
return bytes, []*exportReport{report}, nil
}
}

service.Annotations[k8s.GatewayNameAnnotation] = gatewayName
service.Annotations[k8s.GatewayNsAnnotation] = gatewayNamespace

transformed, err := yaml.Marshal(service)

if err != nil {
return nil, nil, err
}
report.exported = true
return transformed, []*exportReport{report}, nil
}

report := &exportReport{
resourceKind: strings.ToLower(metaType.Kind),
exported: false,
}

return bytes, []*exportReport{report}, nil
}

func generateReport(reports []*exportReport, reportsOut io.Writer) error {
unexportedResources := map[string]int{}

for _, r := range reports {
if r.exported {
if _, err := reportsOut.Write([]byte(fmt.Sprintf("%s \"%s\" exported\n", r.resourceKind, r.resourceName))); err != nil {
return err
}
} else {
if val, ok := unexportedResources[r.resourceKind]; ok {
unexportedResources[r.resourceKind] = val + 1
} else {
unexportedResources[r.resourceKind] = 1
}
}
}

if len(unexportedResources) > 0 {
reportsOut.Write([]byte("\n"))
reportsOut.Write([]byte("Number of skipped resources:\n"))
}

for res, num := range unexportedResources {
reportsOut.Write([]byte(fmt.Sprintf("%ss: %d\n", res, num)))
}

return nil
}

func transformList(bytes []byte, gatewayName, gatewayNamespace string) ([]byte, []*exportReport, error) {
var sourceList corev1.List
if err := yaml.Unmarshal(bytes, &sourceList); err != nil {
return nil, nil, err
}

reports := []*exportReport{}
items := []runtime.RawExtension{}

for _, item := range sourceList.Items {
result, report, err := transform(item.Raw, gatewayName, gatewayNamespace)
if err != nil {
return nil, nil, err
}

exported, err := yaml.YAMLToJSON(result)
if err != nil {
return nil, nil, err
}

items = append(items, runtime.RawExtension{Raw: exported})
reports = append(reports, report...)
}

sourceList.Items = items
result, err := yaml.Marshal(sourceList)
if err != nil {
return nil, nil, err
}
return result, reports, nil
}

func processExportYaml(in io.Reader, out io.Writer, gatewayName, gatewayNamespace string) ([]*exportReport, error) {
reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096))
var reports []*exportReport
// Iterate over all YAML objects in the input
for {
// Read a single YAML object
bytes, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}

isList, err := kindIsList(bytes)
if err != nil {
return nil, err
}

var result []byte
var currentReports []*exportReport

if isList {
result, currentReports, err = transformList(bytes, gatewayName, gatewayNamespace)

} else {
result, currentReports, err = transform(bytes, gatewayName, gatewayNamespace)
}

if err != nil {
return nil, err
}

reports = append(reports, currentReports...)
out.Write(result)
out.Write([]byte("---\n"))
}

return reports, nil
}

func transformExportInput(inputs []io.Reader, errWriter, outWriter io.Writer, gatewayName, gatewayNamespace string) int {
postTransformBuf := &bytes.Buffer{}
reportBuf := &bytes.Buffer{}
var finalReports []*exportReport
for _, input := range inputs {
reports, err := processExportYaml(input, postTransformBuf, gatewayName, gatewayNamespace)
if err != nil {
fmt.Fprintf(errWriter, "Error transforming resources: %v\n", err)
return 1
}
_, err = io.Copy(outWriter, postTransformBuf)

if err != nil {
fmt.Fprintf(errWriter, "Error printing YAML: %v\n", err)
return 1
}

finalReports = append(finalReports, reports...)
}

// print error report after yaml output, for better visibility
if err := generateReport(finalReports, reportBuf); err != nil {
fmt.Fprintf(errWriter, "Error generating reports: %v\n", err)
return 1
}
errWriter.Write([]byte("\n"))
io.Copy(errWriter, reportBuf)
errWriter.Write([]byte("\n"))
return 0
}

func newExportServiceCommand() *cobra.Command {
opts := exportServiceOptions{}

cmd := &cobra.Command{
Use: "export-service",
Short: "Exposes a service to be mirrored",
RunE: func(cmd *cobra.Command, args []string) error {

if len(args) < 1 {
return fmt.Errorf("please specify a kubernetes resource file")
}

if opts.gatewayName == "" {
return errors.New("The --gateway-name flag needs to be set")
}

if opts.gatewayNamespace == "" {
return errors.New("The --gateway-namespace flag needs to be set")
}

in, err := read(args[0])
if err != nil {
return err
}
exitCode := transformExportInput(in, stderr, stdout, opts.gatewayName, opts.gatewayNamespace)
os.Exit(exitCode)
return nil
},
}

cmd.Flags().StringVar(&opts.gatewayName, "gateway-name", "linkerd-gateway", "the name of the gateway")
cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", defaultMulticlusterNamespace, "the namespace of the gateway")
cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror")

return cmd
}
Expand All @@ -870,24 +646,14 @@ This command provides subcommands to manage the multicluster support
functionality of Linkerd. You can use it to install the service mirror
components on a cluster, manage credentials and link clusters together.`,
Example: ` # Install multicluster addons.
linkerd --context=cluster-a cluster install | kubectl --context=cluster-a apply -f -
linkerd --context=cluster-a multicluster install | kubectl --context=cluster-a apply -f -

# Extract mirroring cluster credentials from cluster A and install them on cluster B
linkerd --context=cluster-a cluster link --cluster-name=target | kubectl apply --context=cluster-b -f -

# Export services from cluster to be available to other clusters
kubectl get svc -o yaml | linkerd export-service - | kubectl apply -f -

# Exporting a file from a remote URL
linkerd export-service http://url.to/yml | kubectl apply -f -

# Exporting all the resources inside a folder and its sub-folders.
linkerd export-service <folder> | kubectl apply -f -`,
linkerd --context=cluster-a multicluster link --cluster-name=target | kubectl apply --context=cluster-b -f -`,
}

multiclusterCmd.AddCommand(newLinkCommand())
multiclusterCmd.AddCommand(newMulticlusterInstallCommand())
multiclusterCmd.AddCommand(newExportServiceCommand())
multiclusterCmd.AddCommand(newGatewaysCommand())
multiclusterCmd.AddCommand(newAllowCommand())
return multiclusterCmd
Expand Down
8 changes: 1 addition & 7 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const (
// metrics labels
service = "service"
namespace = "namespace"
targetGatewayNamespace = "target_gateway_namespace"
targetGateway = "target_gateway"
targetCluster = "target_cluster"
targetService = "target_service"
targetServiceNamespace = "target_service_namespace"
Expand Down Expand Up @@ -669,16 +667,12 @@ func metricLabels(resource interface{}) map[string]string {

labels := map[string]string{service: serviceName, namespace: ns}

gateway, hasRemoteGateway := resLabels[consts.RemoteGatewayNameLabel]
gatewayNs, hasRemoteGatwayNs := resLabels[consts.RemoteGatewayNsLabel]
remoteClusterName, hasRemoteClusterName := resLabels[consts.RemoteClusterNameLabel]
serviceFqn, hasServiceFqn := resAnnotations[consts.RemoteServiceFqName]

if hasRemoteGateway && hasRemoteGatwayNs && hasRemoteClusterName && hasServiceFqn {
if hasRemoteClusterName && hasServiceFqn {
// this means we are looking at Endpoints created for the purpose of mirroring
// an out of cluster service.
labels[targetGatewayNamespace] = gatewayNs
labels[targetGateway] = gateway
labels[targetCluster] = remoteClusterName

fqParts := strings.Split(serviceFqn, ".")
Expand Down
Loading