From b2f9ed5e7be7a9374710f1d0c5f373f22e6e79ef Mon Sep 17 00:00:00 2001 From: JP Phillips Date: Wed, 17 Oct 2018 22:17:51 -0500 Subject: [PATCH] catalog/from-k8s: adds support for ClusterIP Service --- catalog/from-k8s/resource.go | 58 ++++++- catalog/from-k8s/resource_test.go | 241 ++++++++++++++++++++++++++++++ 2 files changed, 297 insertions(+), 2 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index 18e18dad76..918f805521 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -193,7 +193,11 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool { return false } - return svc.Spec.Type == apiv1.ServiceTypeNodePort + switch svc.Spec.Type { + case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP: + return true + } + return false } // generateRegistrations generates the necessary Consul registrations for @@ -243,10 +247,11 @@ func (t *ServiceResource) generateRegistrations(key string) { baseService.Service = strings.TrimSpace(v) } + var main string // Determine the default port if len(svc.Spec.Ports) > 0 { nodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort - main := svc.Spec.Ports[0].Name + main = svc.Spec.Ports[0].Name // If a specific port is specified, then use that port value if target, ok := svc.Annotations[annotationServicePort]; ok { @@ -313,6 +318,55 @@ func (t *ServiceResource) generateRegistrations(key string) { } switch svc.Spec.Type { + case apiv1.ServiceTypeClusterIP: + if t.endpointsMap == nil { + return + } + + endpoints := t.endpointsMap[key] + if endpoints == nil { + return + } + + seen := map[string]struct{}{} + for _, subset := range endpoints.Subsets { + for _, subsetAddr := range subset.Addresses { + addr := subsetAddr.IP + if addr == "" { + addr = subsetAddr.Hostname + } + if addr == "" { + continue + } + + // Its not clear whether K8S guarantees ready addresses to + // be unique so we maintain a set to prevent duplicates just + // in case. + if _, ok := seen[addr]; ok { + continue + } + seen[addr] = struct{}{} + + for _, port := range subset.Ports { + if port.Name == main { + baseService.Port = int(port.Port) + break + } + } + + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, addr) + r.Service.Address = addr + if subsetAddr.NodeName != nil { + r.Node = *subsetAddr.NodeName + r.Address = addr + } + + t.consulMap[key] = append(t.consulMap[key], &r) + } + } // For LoadBalancer type services, we create a service instance for // each LoadBalancer entry. We only support entries that have an IP // address assigned (not hostnames). diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index a6e95fe1ff..6658e36bee 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -655,6 +655,247 @@ func TestServiceResource_nodePortInitial(t *testing.T) { require.Equal("2.3.4.5", actual[1].Service.Address) } +// Test that the proper registrations are generated for a ClusterIP type. +func TestServiceResource_clusterIP(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080)}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Port: 8080}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Port: 8080}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(8080, actual[0].Service.Port) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(8080, actual[1].Service.Port) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + +// Test that the proper registrations are generated for a ClusterIP type with multiple ports. +func TestServiceResource_clusterIPMultiEndpoint(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080)}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000)}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(8080, actual[0].Service.Port) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(8080, actual[1].Service.Port) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + +// Test that the proper registrations are generated for a ClusterIP type with annotated override. +func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Annotations: map[string]string{annotationServicePort: "rpc"}, + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080)}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000)}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(2000, actual[0].Service.Port) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(2000, actual[1].Service.Port) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + // testService returns a service that will result in a registration. func testService(name string) *apiv1.Service { return &apiv1.Service{