diff --git a/cli/cmd/multicluster.go b/cli/cmd/multicluster.go index 4062c876ce669..4d2c4d694cd80 100644 --- a/cli/cmd/multicluster.go +++ b/cli/cmd/multicluster.go @@ -1,8 +1,6 @@ package cmd import ( - "bufio" - "bytes" "context" "errors" "fmt" @@ -25,8 +23,6 @@ import ( corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - yamlDecoder "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd/api" "k8s.io/helm/pkg/chartutil" @@ -72,11 +68,7 @@ type ( logLevel string controlPlaneVersion string dockerRegistry string - } - - exportServiceOptions struct { - gatewayNamespace string - gatewayName string + selector string } gatewaysOptions struct { @@ -117,6 +109,7 @@ func newLinkOptionsWithDefault() (*linkOptions, error) { dockerRegistry: defaultDockerRegistry, serviceMirrorRetryLimit: defaults.ServiceMirrorRetryLimit, logLevel: defaults.LogLevel, + selector: k8s.DefaultExportedServiceSelector, }, nil } @@ -517,11 +510,6 @@ func newLinkCommand() *cobra.Command { ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("cluster-credentials-%s", opts.clusterName), Namespace: opts.namespace, - Annotations: map[string]string{ - k8s.RemoteClusterNameLabel: opts.clusterName, - k8s.RemoteClusterDomainAnnotation: configMap.Global.ClusterDomain, - k8s.RemoteClusterLinkerdNamespaceAnnotation: controlPlaneNamespace, - }, }, Data: map[string][]byte{ k8s.ConfigKeyName: kubeconfig, @@ -561,6 +549,11 @@ func newLinkCommand() *cobra.Command { return err } + selector, err := metav1.ParseToLabelSelector(opts.selector) + if err != nil { + return err + } + link := mc.Link{ Name: opts.clusterName, Namespace: opts.namespace, @@ -572,9 +565,14 @@ func newLinkCommand() *cobra.Command { GatewayPort: gatewayPort, GatewayIdentity: gatewayIdentity, ProbeSpec: probeSpec, + Selector: *selector, } - linkOut, err := yaml.Marshal(link.ToUnstructured().Object) + obj, err := link.ToUnstructured() + if err != nil { + return err + } + linkOut, err := yaml.Marshal(obj.Object) if err != nil { return err } @@ -630,229 +628,7 @@ func newLinkCommand() *cobra.Command { cmd.Flags().Uint32Var(&opts.serviceMirrorRetryLimit, "service-mirror-retry-limit", opts.serviceMirrorRetryLimit, "The number of times a failed update from the target cluster is allowed to be retried") cmd.Flags().StringVar(&opts.logLevel, "log-level", opts.logLevel, "Log level for the Multicluster components") cmd.Flags().StringVar(&opts.dockerRegistry, "registry", opts.dockerRegistry, "Docker registry to pull service mirror controller image from") - - return cmd -} - -type exportReport struct { - resourceKind string - resourceName string - exported bool -} - -func transform(bytes []byte, gatewayName, gatewayNamespace string) ([]byte, []*exportReport, error) { - var metaType metav1.TypeMeta - - if err := yaml.Unmarshal(bytes, &metaType); err != nil { - return nil, nil, err - } - - if metaType.Kind == "Service" { - var service corev1.Service - if err := yaml.Unmarshal(bytes, &service); err != nil { - return nil, nil, err - } - - if service.Annotations == nil { - service.Annotations = map[string]string{} - } - report := &exportReport{ - resourceKind: strings.ToLower(metaType.Kind), - resourceName: service.Name, - } - - if service.Labels != nil { - if _, isMirroredResource := service.Labels[k8s.MirroredResourceLabel]; isMirroredResource { - report.exported = false - return bytes, []*exportReport{report}, nil - } - } - - service.Annotations[k8s.GatewayNameAnnotation] = gatewayName - service.Annotations[k8s.GatewayNsAnnotation] = gatewayNamespace - - transformed, err := yaml.Marshal(service) - - if err != nil { - return nil, nil, err - } - report.exported = true - return transformed, []*exportReport{report}, nil - } - - report := &exportReport{ - resourceKind: strings.ToLower(metaType.Kind), - exported: false, - } - - return bytes, []*exportReport{report}, nil -} - -func generateReport(reports []*exportReport, reportsOut io.Writer) error { - unexportedResources := map[string]int{} - - for _, r := range reports { - if r.exported { - if _, err := reportsOut.Write([]byte(fmt.Sprintf("%s \"%s\" exported\n", r.resourceKind, r.resourceName))); err != nil { - return err - } - } else { - if val, ok := unexportedResources[r.resourceKind]; ok { - unexportedResources[r.resourceKind] = val + 1 - } else { - unexportedResources[r.resourceKind] = 1 - } - } - } - - if len(unexportedResources) > 0 { - reportsOut.Write([]byte("\n")) - reportsOut.Write([]byte("Number of skipped resources:\n")) - } - - for res, num := range unexportedResources { - reportsOut.Write([]byte(fmt.Sprintf("%ss: %d\n", res, num))) - } - - return nil -} - -func transformList(bytes []byte, gatewayName, gatewayNamespace string) ([]byte, []*exportReport, error) { - var sourceList corev1.List - if err := yaml.Unmarshal(bytes, &sourceList); err != nil { - return nil, nil, err - } - - reports := []*exportReport{} - items := []runtime.RawExtension{} - - for _, item := range sourceList.Items { - result, report, err := transform(item.Raw, gatewayName, gatewayNamespace) - if err != nil { - return nil, nil, err - } - - exported, err := yaml.YAMLToJSON(result) - if err != nil { - return nil, nil, err - } - - items = append(items, runtime.RawExtension{Raw: exported}) - reports = append(reports, report...) - } - - sourceList.Items = items - result, err := yaml.Marshal(sourceList) - if err != nil { - return nil, nil, err - } - return result, reports, nil -} - -func processExportYaml(in io.Reader, out io.Writer, gatewayName, gatewayNamespace string) ([]*exportReport, error) { - reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096)) - var reports []*exportReport - // Iterate over all YAML objects in the input - for { - // Read a single YAML object - bytes, err := reader.Read() - if err == io.EOF { - break - } - if err != nil { - return nil, err - } - - isList, err := kindIsList(bytes) - if err != nil { - return nil, err - } - - var result []byte - var currentReports []*exportReport - - if isList { - result, currentReports, err = transformList(bytes, gatewayName, gatewayNamespace) - - } else { - result, currentReports, err = transform(bytes, gatewayName, gatewayNamespace) - } - - if err != nil { - return nil, err - } - - reports = append(reports, currentReports...) - out.Write(result) - out.Write([]byte("---\n")) - } - - return reports, nil -} - -func transformExportInput(inputs []io.Reader, errWriter, outWriter io.Writer, gatewayName, gatewayNamespace string) int { - postTransformBuf := &bytes.Buffer{} - reportBuf := &bytes.Buffer{} - var finalReports []*exportReport - for _, input := range inputs { - reports, err := processExportYaml(input, postTransformBuf, gatewayName, gatewayNamespace) - if err != nil { - fmt.Fprintf(errWriter, "Error transforming resources: %v\n", err) - return 1 - } - _, err = io.Copy(outWriter, postTransformBuf) - - if err != nil { - fmt.Fprintf(errWriter, "Error printing YAML: %v\n", err) - return 1 - } - - finalReports = append(finalReports, reports...) - } - - // print error report after yaml output, for better visibility - if err := generateReport(finalReports, reportBuf); err != nil { - fmt.Fprintf(errWriter, "Error generating reports: %v\n", err) - return 1 - } - errWriter.Write([]byte("\n")) - io.Copy(errWriter, reportBuf) - errWriter.Write([]byte("\n")) - return 0 -} - -func newExportServiceCommand() *cobra.Command { - opts := exportServiceOptions{} - - cmd := &cobra.Command{ - Use: "export-service", - Short: "Exposes a service to be mirrored", - RunE: func(cmd *cobra.Command, args []string) error { - - if len(args) < 1 { - return fmt.Errorf("please specify a kubernetes resource file") - } - - if opts.gatewayName == "" { - return errors.New("The --gateway-name flag needs to be set") - } - - if opts.gatewayNamespace == "" { - return errors.New("The --gateway-namespace flag needs to be set") - } - - in, err := read(args[0]) - if err != nil { - return err - } - exitCode := transformExportInput(in, stderr, stdout, opts.gatewayName, opts.gatewayNamespace) - os.Exit(exitCode) - return nil - }, - } - - cmd.Flags().StringVar(&opts.gatewayName, "gateway-name", "linkerd-gateway", "the name of the gateway") - cmd.Flags().StringVar(&opts.gatewayNamespace, "gateway-namespace", defaultMulticlusterNamespace, "the namespace of the gateway") + cmd.Flags().StringVarP(&opts.selector, "selector", "l", opts.selector, "Selector (label query) to filter which services in the target cluster to mirror") return cmd } @@ -870,24 +646,14 @@ This command provides subcommands to manage the multicluster support functionality of Linkerd. You can use it to install the service mirror components on a cluster, manage credentials and link clusters together.`, Example: ` # Install multicluster addons. - linkerd --context=cluster-a cluster install | kubectl --context=cluster-a apply -f - + linkerd --context=cluster-a multicluster install | kubectl --context=cluster-a apply -f - # Extract mirroring cluster credentials from cluster A and install them on cluster B - linkerd --context=cluster-a cluster link --cluster-name=target | kubectl apply --context=cluster-b -f - - - # Export services from cluster to be available to other clusters - kubectl get svc -o yaml | linkerd export-service - | kubectl apply -f - - - # Exporting a file from a remote URL - linkerd export-service http://url.to/yml | kubectl apply -f - - - # Exporting all the resources inside a folder and its sub-folders. - linkerd export-service | kubectl apply -f -`, + linkerd --context=cluster-a multicluster link --cluster-name=target | kubectl apply --context=cluster-b -f -`, } multiclusterCmd.AddCommand(newLinkCommand()) multiclusterCmd.AddCommand(newMulticlusterInstallCommand()) - multiclusterCmd.AddCommand(newExportServiceCommand()) multiclusterCmd.AddCommand(newGatewaysCommand()) multiclusterCmd.AddCommand(newAllowCommand()) return multiclusterCmd diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 68df50243cd7c..225aff435986a 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -25,8 +25,6 @@ const ( // metrics labels service = "service" namespace = "namespace" - targetGatewayNamespace = "target_gateway_namespace" - targetGateway = "target_gateway" targetCluster = "target_cluster" targetService = "target_service" targetServiceNamespace = "target_service_namespace" @@ -669,16 +667,12 @@ func metricLabels(resource interface{}) map[string]string { labels := map[string]string{service: serviceName, namespace: ns} - gateway, hasRemoteGateway := resLabels[consts.RemoteGatewayNameLabel] - gatewayNs, hasRemoteGatwayNs := resLabels[consts.RemoteGatewayNsLabel] remoteClusterName, hasRemoteClusterName := resLabels[consts.RemoteClusterNameLabel] serviceFqn, hasServiceFqn := resAnnotations[consts.RemoteServiceFqName] - if hasRemoteGateway && hasRemoteGatwayNs && hasRemoteClusterName && hasServiceFqn { + if hasRemoteClusterName && hasServiceFqn { // this means we are looking at Endpoints created for the purpose of mirroring // an out of cluster service. - labels[targetGatewayNamespace] = gatewayNs - labels[targetGateway] = gateway labels[targetCluster] = remoteClusterName fqParts := strings.Split(serviceFqn, ".") diff --git a/controller/cmd/service-mirror/cluster_watcher.go b/controller/cmd/service-mirror/cluster_watcher.go index 890b34cfea9e1..b5ba2655767a6 100644 --- a/controller/cmd/service-mirror/cluster_watcher.go +++ b/controller/cmd/service-mirror/cluster_watcher.go @@ -446,13 +446,13 @@ func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ev *RemoteSe return nil } -func isExportedService(annotations map[string]string) bool { - if annotations != nil { - _, hasGtwName := annotations[consts.GatewayNameAnnotation] - _, hasGtwNs := annotations[consts.GatewayNsAnnotation] - return hasGtwName && hasGtwNs +func (rcsw *RemoteClusterServiceWatcher) isExportedService(service *corev1.Service) bool { + selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector) + if err != nil { + rcsw.log.Errorf("Invalid service selector: %s", err) + return false } - return false + return selector.Matches(labels.Set(service.Labels)) } // this method is common to both CREATE and UPDATE because if we have been @@ -461,7 +461,7 @@ func isExportedService(annotations map[string]string) bool { func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.Service) error { localName := rcsw.mirroredResourceName(service.Name) - if isExportedService(service.Annotations) { + if rcsw.isExportedService(service) { localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName) if err != nil { if kerrors.IsNotFound(err) { @@ -518,7 +518,7 @@ func (rcsw *RemoteClusterServiceWatcher) getMirrorServices() ([]*corev1.Service, } func (rcsw *RemoteClusterServiceWatcher) handleOnDelete(service *corev1.Service) { - if isExportedService(service.Annotations) { + if rcsw.isExportedService(service) { rcsw.eventsQueue.Add(&RemoteServiceDeleted{ Name: service.Name, Namespace: service.Namespace, diff --git a/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go b/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go index 758b7b7bfb1e2..7850c92de12fb 100644 --- a/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go +++ b/controller/cmd/service-mirror/cluster_watcher_mirroring_test.go @@ -5,6 +5,7 @@ import ( "reflect" "testing" + consts "github.com/linkerd/linkerd2/pkg/k8s" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" @@ -84,21 +85,9 @@ func (tc *mirroringTestCase) run(t *testing.T) { func TestRemoteServiceCreatedMirroring(t *testing.T) { for _, tt := range []mirroringTestCase{ - { - description: "does not create service and endpoints when gateway address is missing", - environment: serviceCreateWithMissingGateway, - expectedLocalServices: []*corev1.Service{}, - expectedLocalEndpoints: []*corev1.Endpoints{}, - }, - { - description: "does not create service and endpoints when gateway spec is wrong", - environment: createServiceWrongGatewaySpec, - expectedLocalServices: []*corev1.Service{}, - expectedLocalEndpoints: []*corev1.Endpoints{}, - }, { description: "create service and endpoints when gateway can be resolved", - environment: createServiceOkeGatewaySpec, + environment: createExportedService, expectedLocalServices: []*corev1.Service{ mirrorService( "service-one-remote", @@ -142,7 +131,7 @@ func TestRemoteServiceDeletedMirroring(t *testing.T) { for _, tt := range []mirroringTestCase{ { description: "deletes locally mirrored service", - environment: deleteMirroredService, + environment: deleteMirrorService, }, } { tc := tt // pin @@ -235,7 +224,9 @@ func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase { description: fmt.Sprintf("enqueue a RemoteServiceCreated event when this is not a gateway and we have the needed annotations (%s)", testType), environment: onAddOrUpdateExportedSvc(isAdd), expectedEventsInQueue: []interface{}{&RemoteServiceCreated{ - service: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "resVersion", nil), + service: remoteService("test-service", "test-namespace", "resVersion", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, nil), }}, }, { @@ -244,7 +235,9 @@ func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase { expectedEventsInQueue: []interface{}{&RemoteServiceUpdated{ localService: mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil), localEndpoints: endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil), - remoteUpdate: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil), + remoteUpdate: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, nil), }}, expectedLocalServices: []*corev1.Service{ mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil), @@ -299,7 +292,7 @@ func TestOnDelete(t *testing.T) { for _, tt := range []mirroringTestCase{ { description: "enqueues a RemoteServiceDeleted because there is gateway metadata present on the service", - environment: onDeleteWithGatewayMetadata, + environment: onDeleteExportedService, expectedEventsInQueue: []interface{}{ &RemoteServiceDeleted{ Name: "test-service", @@ -309,7 +302,7 @@ func TestOnDelete(t *testing.T) { }, { description: "skips because there is no gateway metadata present on the service", - environment: onDeleteNoGatewayMetadata, + environment: onDeleteNonExportedService, expectedEventsInQueue: []interface{}{}, }, } { diff --git a/controller/cmd/service-mirror/cluster_watcher_test_util.go b/controller/cmd/service-mirror/cluster_watcher_test_util.go index a1f9ff24d630e..dfa8f3c90cddf 100644 --- a/controller/cmd/service-mirror/cluster_watcher_test_util.go +++ b/controller/cmd/service-mirror/cluster_watcher_test_util.go @@ -25,11 +25,14 @@ const ( defaultProbePeriod = 60 ) -var defaultProbeSpec = multicluster.ProbeSpec{ - Path: defaultProbePath, - Port: defaultProbePort, - Period: time.Duration(defaultProbePeriod) * time.Second, -} +var ( + defaultProbeSpec = multicluster.ProbeSpec{ + Path: defaultProbePath, + Port: defaultProbePort, + Period: time.Duration(defaultProbePeriod) * time.Second, + } + defaultSelector, _ = metav1.ParseToLabelSelector(consts.DefaultExportedServiceSelector) +) type testEnvironment struct { events []interface{} @@ -76,49 +79,12 @@ func (te *testEnvironment) runEnvironment(watcherQueue workqueue.RateLimitingInt return localAPI, nil } -var serviceCreateWithMissingGateway = &testEnvironment{ - events: []interface{}{ - &RemoteServiceCreated{ - service: remoteService("service-one", "ns1", "missing-gateway", "missing-namespace", "111", nil), - }, - }, - link: multicluster.Link{ - TargetClusterName: clusterName, - GatewayAddress: "", - }, -} - -var createServiceWrongGatewaySpec = &testEnvironment{ - events: []interface{}{ - &RemoteServiceCreated{ - service: remoteService("service-one", "ns1", "existing-gateway", "existing-namespace", - "111", []corev1.ServicePort{ - { - Name: "port1", - Protocol: "TCP", - Port: 555, - }, - { - Name: "port2", - Protocol: "TCP", - Port: 666, - }, - }), - }, - }, - remoteResources: []string{ - gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-wrong", 888, "", 111, "/path", 666), - }, - link: multicluster.Link{ - TargetClusterName: clusterName, - GatewayAddress: "??????", - }, -} - -var createServiceOkeGatewaySpec = &testEnvironment{ +var createExportedService = &testEnvironment{ events: []interface{}{ &RemoteServiceCreated{ - service: remoteService("service-one", "ns1", "existing-gateway", "existing-namespace", "111", []corev1.ServicePort{ + service: remoteService("service-one", "ns1", "111", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, []corev1.ServicePort{ { Name: "port1", Protocol: "TCP", @@ -142,10 +108,11 @@ var createServiceOkeGatewaySpec = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } -var deleteMirroredService = &testEnvironment{ +var deleteMirrorService = &testEnvironment{ events: []interface{}{ &RemoteServiceDeleted{ Name: "test-service-remote-to-delete", @@ -163,13 +130,16 @@ var deleteMirroredService = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } var updateServiceWithChangedPorts = &testEnvironment{ events: []interface{}{ &RemoteServiceUpdated{ - remoteUpdate: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentServiceResVersion", []corev1.ServicePort{ + remoteUpdate: remoteService("test-service", "test-namespace", "currentServiceResVersion", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, []corev1.ServicePort{ { Name: "port1", Protocol: "TCP", @@ -253,6 +223,7 @@ var updateServiceWithChangedPorts = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } @@ -282,7 +253,7 @@ var gcTriggered = &testEnvironment{ endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", nil), }, remoteResources: []string{ - remoteServiceAsYaml("test-service-1", "test-namespace", "gateway", "gateway-ns", "", nil), + remoteServiceAsYaml("test-service-1", "test-namespace", "", nil), }, link: multicluster.Link{ TargetClusterName: clusterName, @@ -292,7 +263,9 @@ var gcTriggered = &testEnvironment{ func onAddOrUpdateExportedSvc(isAdd bool) *testEnvironment { return &testEnvironment{ events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "resVersion", nil)), + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "resVersion", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, nil)), }, link: multicluster.Link{ TargetClusterName: clusterName, @@ -301,6 +274,7 @@ func onAddOrUpdateExportedSvc(isAdd bool) *testEnvironment { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } @@ -309,7 +283,9 @@ func onAddOrUpdateExportedSvc(isAdd bool) *testEnvironment { func onAddOrUpdateRemoteServiceUpdated(isAdd bool) *testEnvironment { return &testEnvironment{ events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil)), + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, nil)), }, localResources: []string{ mirrorServiceAsYaml("test-service-remote", "test-namespace", "pastResourceVersion", nil), @@ -322,6 +298,7 @@ func onAddOrUpdateRemoteServiceUpdated(isAdd bool) *testEnvironment { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } } @@ -329,7 +306,9 @@ func onAddOrUpdateRemoteServiceUpdated(isAdd bool) *testEnvironment { func onAddOrUpdateSameResVersion(isAdd bool) *testEnvironment { return &testEnvironment{ events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil)), + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, nil)), }, localResources: []string{ mirrorServiceAsYaml("test-service-remote", "test-namespace", "currentResVersion", nil), @@ -342,6 +321,7 @@ func onAddOrUpdateSameResVersion(isAdd bool) *testEnvironment { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } } @@ -349,7 +329,7 @@ func onAddOrUpdateSameResVersion(isAdd bool) *testEnvironment { func serviceNotExportedAnymore(isAdd bool) *testEnvironment { return &testEnvironment{ events: []interface{}{ - onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "", "gateway-ns", "currentResVersion", nil)), + onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{}, nil)), }, localResources: []string{ mirrorServiceAsYaml("test-service-remote", "test-namespace", "currentResVersion", nil), @@ -362,14 +342,17 @@ func serviceNotExportedAnymore(isAdd bool) *testEnvironment { GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } } -var onDeleteWithGatewayMetadata = &testEnvironment{ +var onDeleteExportedService = &testEnvironment{ events: []interface{}{ &OnDeleteCalled{ - svc: remoteService("test-service", "test-namespace", "gateway", "gateway-ns", "currentResVersion", nil), + svc: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{ + consts.DefaultExportedServiceSelector: "true", + }, nil), }, }, link: multicluster.Link{ @@ -379,13 +362,14 @@ var onDeleteWithGatewayMetadata = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } -var onDeleteNoGatewayMetadata = &testEnvironment{ +var onDeleteNonExportedService = &testEnvironment{ events: []interface{}{ &OnDeleteCalled{ - svc: remoteService("gateway", "test-namespace", "", "", "currentResVersion", nil), + svc: remoteService("gateway", "test-namespace", "currentResVersion", map[string]string{}, nil), }, }, link: multicluster.Link{ @@ -395,6 +379,7 @@ var onDeleteNoGatewayMetadata = &testEnvironment{ GatewayAddress: "192.0.2.127", GatewayPort: 888, ProbeSpec: defaultProbeSpec, + Selector: *defaultSelector, }, } @@ -452,13 +437,7 @@ func diffEndpoints(expected, actual *corev1.Endpoints) error { return nil } -func remoteService(name, namespace, gtwName, gtwNs, resourceVersion string, ports []corev1.ServicePort) *corev1.Service { - annotations := make(map[string]string) - if gtwName != "" && gtwNs != "" { - annotations[consts.GatewayNameAnnotation] = gtwName - annotations[consts.GatewayNsAnnotation] = gtwNs - } - +func remoteService(name, namespace, resourceVersion string, labels map[string]string, ports []corev1.ServicePort) *corev1.Service { return &corev1.Service{ TypeMeta: metav1.TypeMeta{ Kind: "Service", @@ -468,7 +447,7 @@ func remoteService(name, namespace, gtwName, gtwNs, resourceVersion string, port Name: name, Namespace: namespace, ResourceVersion: resourceVersion, - Annotations: annotations, + Labels: labels, }, Spec: corev1.ServiceSpec{ Ports: ports, @@ -476,8 +455,8 @@ func remoteService(name, namespace, gtwName, gtwNs, resourceVersion string, port } } -func remoteServiceAsYaml(name, namespace, gtwName, gtwNs, resourceVersion string, ports []corev1.ServicePort) string { - svc := remoteService(name, namespace, gtwName, gtwNs, resourceVersion, ports) +func remoteServiceAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string { + svc := remoteService(name, namespace, resourceVersion, nil, ports) bytes, err := yaml.Marshal(svc) if err != nil { @@ -532,10 +511,9 @@ func gateway(name, namespace, resourceVersion, ip, hostname, portName string, po Namespace: namespace, ResourceVersion: resourceVersion, Annotations: map[string]string{ - consts.GatewayIdentity: identity, - consts.GatewayProbePath: probePath, - consts.GatewayProbePeriod: fmt.Sprint(probePeriod), - consts.MulticlusterGatewayAnnotation: "true", + consts.GatewayIdentity: identity, + consts.GatewayProbePath: probePath, + consts.GatewayProbePeriod: fmt.Sprint(probePeriod), }, }, Spec: corev1.ServiceSpec{ diff --git a/controller/cmd/service-mirror/main.go b/controller/cmd/service-mirror/main.go index 378b5918cb3ae..4faf4c5771a09 100644 --- a/controller/cmd/service-mirror/main.go +++ b/controller/cmd/service-mirror/main.go @@ -114,7 +114,7 @@ func Main(args []string) { } } -func loadCredentials(link multicluster.Link, namespace string, k8sAPI *k8s.KubernetesAPI) (*servicemirror.WatchedClusterConfig, error) { +func loadCredentials(link multicluster.Link, namespace string, k8sAPI *k8s.KubernetesAPI) ([]byte, error) { // Load the credentials secret secret, err := k8sAPI.Interface.CoreV1().Secrets(namespace).Get(link.ClusterCredentialsSecret, metav1.GetOptions{}) if err != nil { @@ -126,7 +126,7 @@ func loadCredentials(link multicluster.Link, namespace string, k8sAPI *k8s.Kuber func restartClusterWatcher( link multicluster.Link, namespace string, - creds *servicemirror.WatchedClusterConfig, + creds []byte, controllerK8sAPI *controllerK8s.API, requeueLimit int, repairPeriod time.Duration, @@ -139,7 +139,7 @@ func restartClusterWatcher( probeWorker.Stop() } - cfg, err := clientcmd.RESTConfigFromKubeConfig(creds.APIConfig) + cfg, err := clientcmd.RESTConfigFromKubeConfig(creds) if err != nil { log.Errorf("Unable to parse kube config: %s", err) return diff --git a/pkg/healthcheck/healthcheck_multicluster.go b/pkg/healthcheck/healthcheck_multicluster.go index fc91757a5b5c1..670feeb4c71ac 100644 --- a/pkg/healthcheck/healthcheck_multicluster.go +++ b/pkg/healthcheck/healthcheck_multicluster.go @@ -302,15 +302,15 @@ func (hc *HealthChecker) checkRemoteClusterConnectivity() error { continue } - clientConfig, err := clientcmd.RESTConfigFromKubeConfig(config.APIConfig) + clientConfig, err := clientcmd.RESTConfigFromKubeConfig(config) if err != nil { - errors = append(errors, fmt.Errorf("* secret: [%s/%s] cluster: [%s]: unable to parse api config: %s", secret.Namespace, secret.Name, config.ClusterName, err)) + errors = append(errors, fmt.Errorf("* secret: [%s/%s] cluster: [%s]: unable to parse api config: %s", secret.Namespace, secret.Name, link.TargetClusterName, err)) continue } remoteAPI, err := k8s.NewAPIForConfig(clientConfig, "", []string{}, requestTimeout) if err != nil { - errors = append(errors, fmt.Errorf("* secret: [%s/%s] cluster: [%s]: could not instantiate api for target cluster: %s", secret.Namespace, secret.Name, config.ClusterName, err)) + errors = append(errors, fmt.Errorf("* secret: [%s/%s] cluster: [%s]: could not instantiate api for target cluster: %s", secret.Namespace, secret.Name, link.TargetClusterName, err)) continue } @@ -328,7 +328,7 @@ func (hc *HealthChecker) checkRemoteClusterConnectivity() error { } if err := comparePermissions(expectedServiceMirrorRemoteClusterPolicyVerbs, verbs); err != nil { - errors = append(errors, fmt.Errorf("* cluster: [%s]: Insufficient Service permissions: %s", config.ClusterName, err)) + errors = append(errors, fmt.Errorf("* cluster: [%s]: Insufficient Service permissions: %s", link.TargetClusterName, err)) } links = append(links, fmt.Sprintf("\t* %s", link.TargetClusterName)) @@ -366,15 +366,15 @@ func (hc *HealthChecker) checkRemoteClusterAnchors() error { continue } - clientConfig, err := clientcmd.RESTConfigFromKubeConfig(config.APIConfig) + clientConfig, err := clientcmd.RESTConfigFromKubeConfig(config) if err != nil { - errors = append(errors, fmt.Sprintf("* secret: [%s/%s] cluster: [%s]: unable to parse api config: %s", secret.Namespace, secret.Name, config.ClusterName, err)) + errors = append(errors, fmt.Sprintf("* secret: [%s/%s] cluster: [%s]: unable to parse api config: %s", secret.Namespace, secret.Name, link.TargetClusterName, err)) continue } remoteAPI, err := k8s.NewAPIForConfig(clientConfig, "", []string{}, requestTimeout) if err != nil { - errors = append(errors, fmt.Sprintf("* secret: [%s/%s] cluster: [%s]: could not instantiate api for target cluster: %s", secret.Namespace, secret.Name, config.ClusterName, err)) + errors = append(errors, fmt.Sprintf("* secret: [%s/%s] cluster: [%s]: could not instantiate api for target cluster: %s", secret.Namespace, secret.Name, link.TargetClusterName, err)) continue } @@ -506,7 +506,7 @@ func (hc *HealthChecker) checkIfMirrorServicesHaveEndpoints() error { // Check if there is a relevant end-point endpoint, err := hc.kubeAPI.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{}) if err != nil || len(endpoint.Subsets) == 0 { - servicesWithNoEndpoints = append(servicesWithNoEndpoints, fmt.Sprintf("%s.%s mirrored from cluster [%s] (gateway: [%s/%s])", svc.Name, svc.Namespace, svc.Labels[k8s.RemoteClusterNameLabel], svc.Labels[k8s.RemoteGatewayNsLabel], svc.Labels[k8s.RemoteGatewayNameLabel])) + servicesWithNoEndpoints = append(servicesWithNoEndpoints, fmt.Sprintf("%s.%s mirrored from cluster [%s]", svc.Name, svc.Namespace, svc.Labels[k8s.RemoteClusterNameLabel])) } } diff --git a/pkg/k8s/labels.go b/pkg/k8s/labels.go index 32165a8f453df..6dc47f72e7974 100644 --- a/pkg/k8s/labels.go +++ b/pkg/k8s/labels.go @@ -384,21 +384,9 @@ const ( // the access information for remote clusters. MirrorSecretType = SvcMirrorPrefix + "/remote-kubeconfig" - // GatewayNameAnnotation is the annotation that is present on the remote - // service, indicating which gateway is supposed to route traffic to it - GatewayNameAnnotation = SvcMirrorPrefix + "/gateway-name" - - // RemoteGatewayNameLabel is same as GatewayNameAnnotation but on the local, - // mirrored service. It's used for quick querying when we want to figure out - // the services that are being associated with a certain gateway - RemoteGatewayNameLabel = SvcMirrorPrefix + "/remote-gateway-name" - - // GatewayNsAnnotation is present on the remote service, indicating the ns - // in which we can find the gateway - GatewayNsAnnotation = SvcMirrorPrefix + "/gateway-ns" - - // RemoteGatewayNsLabel follows the same kind of logic as RemoteGatewayNameLabel - RemoteGatewayNsLabel = SvcMirrorPrefix + "/remote-gateway-ns" + // DefaultExportedServiceSelector is the default label selector for exported + // services. + DefaultExportedServiceSelector = SvcMirrorPrefix + "/exported" // MirroredResourceLabel indicates that this resource is the result // of a mirroring operation (can be a namespace or a service) @@ -407,35 +395,10 @@ const ( // MirroredGatewayLabel indicates that this is a mirrored gateway MirroredGatewayLabel = SvcMirrorPrefix + "/mirrored-gateway" - // MirroredGatewayProbePeriod specifies the probe period for the gateway mirror - MirroredGatewayProbePeriod = SvcMirrorPrefix + "/mirrored-gateway-probe-period" - - // MirroredGatewayProbePath specifies the probe path for the gateway mirror - MirroredGatewayProbePath = SvcMirrorPrefix + "/mirrored-gateway-probe-path" - - // MirroredGatewayRemoteName specifies the name of the remote gateway that has been mirrored - MirroredGatewayRemoteName = SvcMirrorPrefix + "/mirrored-gateway-remote-name" - - // MirroredGatewayRemoteNameSpace specifies the namespace of the remote gateway that has been mirrored - MirroredGatewayRemoteNameSpace = SvcMirrorPrefix + "/mirrored-gateway-remote-namespace" - - // MulticlusterGatewayAnnotation indicates that this service is a - // gateway - MulticlusterGatewayAnnotation = SvcMirrorPrefix + "/multicluster-gateway" - // RemoteClusterNameLabel put on a local mirrored service, it // allows us to associate a mirrored service with a remote cluster RemoteClusterNameLabel = SvcMirrorPrefix + "/cluster-name" - // RemoteClusterDomainAnnotation is present on the secret - // carrying the config of the remote cluster, to allow for - // using custom cluster domains - RemoteClusterDomainAnnotation = SvcMirrorPrefix + "/remote-cluster-domain" - - // RemoteClusterLinkerdNamespaceAnnotation is present on the secret - // carrying the config of the remote cluster - RemoteClusterLinkerdNamespaceAnnotation = SvcMirrorPrefix + "/remote-cluster-l5d-ns" - // RemoteResourceVersionAnnotation is the last observed remote resource // version of a mirrored resource. Useful when doing updates RemoteResourceVersionAnnotation = SvcMirrorPrefix + "/remote-resource-version" diff --git a/pkg/multicluster/link.go b/pkg/multicluster/link.go index 2503301a23880..9441fe712b4d0 100644 --- a/pkg/multicluster/link.go +++ b/pkg/multicluster/link.go @@ -1,6 +1,7 @@ package multicluster import ( + "encoding/json" "errors" "fmt" "strconv" @@ -41,6 +42,7 @@ type ( GatewayPort uint32 GatewayIdentity string ProbeSpec ProbeSpec + Selector metav1.LabelSelector } ) @@ -121,6 +123,18 @@ func NewLink(u unstructured.Unstructured) (Link, error) { return Link{}, err } + selector := metav1.LabelSelector{} + if selectorObj, ok := specObj["selector"]; ok { + bytes, err := json.Marshal(selectorObj) + if err != nil { + return Link{}, err + } + err = json.Unmarshal(bytes, &selector) + if err != nil { + return Link{}, err + } + } + return Link{ Name: u.GetName(), Namespace: u.GetNamespace(), @@ -132,14 +146,40 @@ func NewLink(u unstructured.Unstructured) (Link, error) { GatewayPort: uint32(gatewayPort), GatewayIdentity: gatewayIdentity, ProbeSpec: probeSpec, + Selector: selector, }, nil } // ToUnstructured converts a Link struct into an unstructured resource that can // be used by a kubernetes dynamic client. -func (l Link) ToUnstructured() unstructured.Unstructured { - return unstructured.Unstructured{ +func (l Link) ToUnstructured() (unstructured.Unstructured, error) { + spec := map[string]interface{}{ + "targetClusterName": l.TargetClusterName, + "targetClusterDomain": l.TargetClusterDomain, + "targetClusterLinkerdNamespace": l.TargetClusterLinkerdNamespace, + "clusterCredentialsSecret": l.ClusterCredentialsSecret, + "gatewayAddress": l.GatewayAddress, + "gatewayPort": fmt.Sprintf("%d", l.GatewayPort), + "gatewayIdentity": l.GatewayIdentity, + "probeSpec": map[string]interface{}{ + "path": l.ProbeSpec.Path, + "port": fmt.Sprintf("%d", l.ProbeSpec.Port), + "period": l.ProbeSpec.Period.String(), + }, + } + data, err := json.Marshal(l.Selector) + if err != nil { + return unstructured.Unstructured{}, err + } + selector := make(map[string]interface{}) + err = json.Unmarshal(data, &selector) + if err != nil { + return unstructured.Unstructured{}, err + } + spec["selector"] = selector + + return unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": k8s.LinkAPIGroupVersion, "kind": k8s.LinkKind, @@ -147,22 +187,9 @@ func (l Link) ToUnstructured() unstructured.Unstructured { "name": l.Name, "namespace": l.Namespace, }, - "spec": map[string]interface{}{ - "targetClusterName": l.TargetClusterName, - "targetClusterDomain": l.TargetClusterDomain, - "targetClusterLinkerdNamespace": l.TargetClusterLinkerdNamespace, - "clusterCredentialsSecret": l.ClusterCredentialsSecret, - "gatewayAddress": l.GatewayAddress, - "gatewayPort": fmt.Sprintf("%d", l.GatewayPort), - "gatewayIdentity": l.GatewayIdentity, - "probeSpec": map[string]interface{}{ - "path": l.ProbeSpec.Path, - "port": fmt.Sprintf("%d", l.ProbeSpec.Port), - "period": l.ProbeSpec.Period.String(), - }, - }, + "spec": spec, }, - } + }, nil } // ExtractProbeSpec parses the ProbSpec from a gateway service's annotations. diff --git a/pkg/servicemirror/util.go b/pkg/servicemirror/util.go index caa7b8a35a92e..5172999e7e7be 100644 --- a/pkg/servicemirror/util.go +++ b/pkg/servicemirror/util.go @@ -7,39 +7,13 @@ import ( corev1 "k8s.io/api/core/v1" ) -// WatchedClusterConfig contains the needed data to identify a remote cluster -type WatchedClusterConfig struct { - APIConfig []byte - ClusterName string - ClusterDomain string - LinkerdNamespace string -} - // ParseRemoteClusterSecret extracts the credentials used to access the remote cluster -func ParseRemoteClusterSecret(secret *corev1.Secret) (*WatchedClusterConfig, error) { - clusterName, hasClusterName := secret.Annotations[consts.RemoteClusterNameLabel] +func ParseRemoteClusterSecret(secret *corev1.Secret) ([]byte, error) { config, hasConfig := secret.Data[consts.ConfigKeyName] - domain, hasDomain := secret.Annotations[consts.RemoteClusterDomainAnnotation] - l5dNamespace, hasL5dNamespace := secret.Annotations[consts.RemoteClusterLinkerdNamespaceAnnotation] - if !hasClusterName { - return nil, fmt.Errorf("secret of type %s should contain key %s", consts.MirrorSecretType, consts.ConfigKeyName) - } if !hasConfig { return nil, fmt.Errorf("secret should contain target cluster name as annotation %s", consts.RemoteClusterNameLabel) } - if !hasDomain { - return nil, fmt.Errorf("secret should contain target cluster domain as annotation %s", consts.RemoteClusterDomainAnnotation) - } - - if !hasL5dNamespace { - return nil, fmt.Errorf("secret should contain target cluster linkerd installation namespace as annotation %s", consts.RemoteClusterLinkerdNamespaceAnnotation) - } - return &WatchedClusterConfig{ - APIConfig: config, - ClusterName: clusterName, - ClusterDomain: domain, - LinkerdNamespace: l5dNamespace, - }, nil + return config, nil }