diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index 508c7166bcee..36bdef6655df 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -30,6 +30,21 @@ const ( ConsulK8STag = "k8s" ) +type NodePortSyncType string + +const ( + // Only sync NodePort services with a node's ExternalIP address. + // Doesn't sync if an ExternalIP doesn't exist + ExternalOnly NodePortSyncType = "ExternalOnly" + + // Sync with an ExternalIP first, if it doesn't exist, use the + // node's InternalIP address instead + ExternalFirst NodePortSyncType = "ExternalFirst" + + // Sync NodePort services using + InternalOnly NodePortSyncType = "InternalOnly" +) + // ServiceResource implements controller.Resource to sync Service resource // types from K8S. type ServiceResource struct { @@ -47,6 +62,11 @@ type ServiceResource struct { // Setting this to false will ignore ClusterIP services during the sync. ClusterIPSync bool + // NodeExternalIPSync set to true (the default) syncs NodePort services + // using the node's external ip address. When false, the node's internal + // ip address will be used instead. + NodePortSync NodePortSyncType + // serviceMap is a mapping of unique key (given by controller) to // the service structure. endpointsMap is the mapping of the same // uniqueKey to a set of endpoints. @@ -252,35 +272,60 @@ func (t *ServiceResource) generateRegistrations(key string) { baseService.Service = strings.TrimSpace(v) } - // Determine the default port + // Determine the default port and set port annotations if len(svc.Spec.Ports) > 0 { - nodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort - main := svc.Spec.Ports[0].Name + // Create port variable, defaults to 0 + var port int + + // Flag identifying whether the service is of NodePort type + isNodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort // If a specific port is specified, then use that port value - if target, ok := svc.Annotations[annotationServicePort]; ok { - main = target + target, ok := svc.Annotations[annotationServicePort] + if ok { if v, err := strconv.ParseInt(target, 0, 0); err == nil { - baseService.Port = int(v) + port = int(v) } } - // Go through the ports so we can add them to the service meta. We - // also use this opportunity to find our default port. - for _, p := range svc.Spec.Ports { - target := p.Port - if nodePort && p.NodePort > 0 { - target = p.NodePort + // For when the port was a name instead of an int + if port == 0 && target != "" { + // Find the named port + for _, p := range svc.Spec.Ports { + if p.Name == target { + // Pick the right port based on the type of service + if isNodePort && p.NodePort > 0 { + port = int(p.NodePort) + } else { + port = int(p.Port) + } + } } + } - // Set the tag - baseService.Meta["port-"+p.Name] = strconv.FormatInt(int64(target), 10) - - // If the name matches our main port, set our main port - if p.Name == main { - baseService.Port = int(target) + // If the port was not set above, set it with the first port + // based on the service type + if port == 0 && isNodePort { + // Find first defined NodePort + for _, p := range svc.Spec.Ports { + if p.NodePort > 0 { + port = int(p.NodePort) + break + } } } + if port == 0 && !isNodePort { + port = int(svc.Spec.Ports[0].Port) + } + + // Set service port based on defined port + baseService.Port = port + + // Add all the ports as annotations + for _, p := range svc.Spec.Ports { + // Set the tag + baseService.Meta["port-"+p.Name] = strconv.FormatInt(int64(p.Port), 10) + } } // Parse any additional tags @@ -350,9 +395,83 @@ func (t *ServiceResource) generateRegistrations(key string) { } // For NodePort services, we create a service instance for each - // endpoint of the service. This way we don't register _every_ K8S + // endpoint of the service, which corresponds to the nodes the service's + // pods are running on. This way we don't register _every_ K8S // node as part of the service. - case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP: + case apiv1.ServiceTypeNodePort: + if t.endpointsMap == nil { + return + } + + endpoints := t.endpointsMap[key] + if endpoints == nil { + return + } + + for _, subset := range endpoints.Subsets { + for _, subsetAddr := range subset.Addresses { + // Check that the node name exists + // subsetAddr.NodeName is of type *string + if subsetAddr.NodeName == nil { + continue + } + + // Look up the node's ip address by getting node info + node, err := t.Client.CoreV1().Nodes().Get(*subsetAddr.NodeName, metav1.GetOptions{}) + if err != nil { + t.Log.Warn("error getting node info", "error", err) + continue + } + + // Set the expected node address type + var expectedType apiv1.NodeAddressType + if t.NodePortSync == InternalOnly { + expectedType = apiv1.NodeInternalIP + } else { + expectedType = apiv1.NodeExternalIP + } + + // Find the ip address for the node and + // create the Consul service using it + var found bool + for _, address := range node.Status.Addresses { + if address.Type == expectedType { + found = true + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, address.Address) + r.Service.Address = address.Address + r.Node = *subsetAddr.NodeName + r.Address = address.Address + + t.consulMap[key] = append(t.consulMap[key], &r) + } + } + + // If an ExternalIP wasn't found, and ExternalFirst is set, + // use an InternalIP + if t.NodePortSync == ExternalFirst && !found { + for _, address := range node.Status.Addresses { + if address.Type == apiv1.NodeInternalIP { + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, address.Address) + r.Service.Address = address.Address + r.Node = *subsetAddr.NodeName + r.Address = address.Address + + t.consulMap[key] = append(t.consulMap[key], &r) + } + } + } + } + } + + // For ClusterIP services, we register a service instance + // for each pod. + case apiv1.ServiceTypeClusterIP: if t.endpointsMap == nil { return } diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index 7fcef471b3a2..43b2d4c51cf5 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -523,16 +523,413 @@ func TestServiceResource_nodePort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, + }) + defer closer() + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "4.5.6.7"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "3.4.5.6"}, + }, + }, + }) + require.NoError(err) + time.Sleep(200 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(30000, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(30000, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + +// Test that the proper registrations are generated for a NodePort type. +func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, + }) + defer closer() + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + }, + }, + }) + require.NoError(err) + time.Sleep(200 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 1) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(30000, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) +} + +// Test that a NodePort created earlier works (doesn't require an Endpoints +// update event). +func TestServiceResource_nodePortInitial(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, + }) + defer closer() + time.Sleep(100 * time.Millisecond) + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + }, + }, + }) + require.NoError(err) + time.Sleep(200 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(400 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(30000, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(30000, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) +} + +// Test that the proper registrations are generated for a NodePort with annotated port. +func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, + }) + defer closer() + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, }) - defer closer() + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) // Insert the service - _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + Name: "foo", + Annotations: map[string]string{annotationServicePort: "rpc"}, }, Spec: apiv1.ServiceSpec{ @@ -548,8 +945,70 @@ func TestServiceResource_nodePort(t *testing.T) { // Wait a bit time.Sleep(300 * time.Millisecond) + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(30001, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(30001, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + +// Test that the proper registrations are generated for a NodePort with annotated port. +func TestServiceResource_nodePortUnnamedPort(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, + }) + defer closer() + node1 := "ip-10-11-12-13.ec2.internal" node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + // Insert the endpoints _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -562,8 +1021,8 @@ func TestServiceResource_nodePort(t *testing.T) { apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, }, Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, + apiv1.EndpointPort{Port: 8080}, + apiv1.EndpointPort{Port: 2000}, }, }, @@ -572,8 +1031,8 @@ func TestServiceResource_nodePort(t *testing.T) { apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, }, Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, + apiv1.EndpointPort{Port: 8080}, + apiv1.EndpointPort{Port: 2000}, }, }, }, @@ -583,6 +1042,25 @@ func TestServiceResource_nodePort(t *testing.T) { // Wait a bit time.Sleep(300 * time.Millisecond) + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + // Verify what we got syncer.Lock() defer syncer.Unlock() @@ -599,9 +1077,8 @@ func TestServiceResource_nodePort(t *testing.T) { require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } -// Test that a NodePort created earlier works (doesn't require an Endpoints -// update event). -func TestServiceResource_nodePortInitial(t *testing.T) { +// Test that the proper registrations are generated for a NodePort type. +func TestServiceResource_nodePort_internalOnlySync(t *testing.T) { t.Parallel() require := require.New(t) client := fake.NewSimpleClientset() @@ -609,17 +1086,47 @@ func TestServiceResource_nodePortInitial(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: InternalOnly, }) defer closer() - time.Sleep(100 * time.Millisecond) node1 := "ip-10-11-12-13.ec2.internal" node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "4.5.6.7"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "3.4.5.6"}, + }, + }, + }) + require.NoError(err) + time.Sleep(200 * time.Millisecond) + // Insert the endpoints - _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", }, @@ -647,7 +1154,9 @@ func TestServiceResource_nodePortInitial(t *testing.T) { }, }) require.NoError(err) - time.Sleep(200 * time.Millisecond) + + // Wait a bit + time.Sleep(300 * time.Millisecond) // Insert the service _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ @@ -666,7 +1175,7 @@ func TestServiceResource_nodePortInitial(t *testing.T) { require.NoError(err) // Wait a bit - time.Sleep(400 * time.Millisecond) + time.Sleep(300 * time.Millisecond) // Verify what we got syncer.Lock() @@ -674,17 +1183,18 @@ func TestServiceResource_nodePortInitial(t *testing.T) { actual := syncer.Registrations require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) - require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal("4.5.6.7", actual[0].Service.Address) require.Equal(30000, actual[0].Service.Port) require.Equal(node1, actual[0].Node) require.Equal("foo", actual[1].Service.Service) - require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal("3.4.5.6", actual[1].Service.Address) require.Equal(30000, actual[1].Service.Port) require.Equal(node2, actual[1].Node) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } -// Test that the proper registrations are generated for a NodePort with annotated port. -func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { +// Test that the proper registrations are generated for a NodePort type. +func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { t.Parallel() require := require.New(t) client := fake.NewSimpleClientset() @@ -692,34 +1202,44 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalFirst, }) defer closer() - // Insert the service - _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Annotations: map[string]string{annotationServicePort: "rpc"}, + Name: node1, }, - Spec: apiv1.ServiceSpec{ - Type: apiv1.ServiceTypeNodePort, - Ports: []apiv1.ServicePort{ - apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, - apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "4.5.6.7"}, }, }, }) require.NoError(err) - // Wait a bit - time.Sleep(300 * time.Millisecond) + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "3.4.5.6"}, + }, + }, + }) + require.NoError(err) + time.Sleep(200 * time.Millisecond) - node1 := "ip-10-11-12-13.ec2.internal" - node2 := "ip-10-11-12-14.ec2.internal" // Insert the endpoints _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -753,21 +1273,37 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { // Wait a bit time.Sleep(300 * time.Millisecond) + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + // Verify what we got syncer.Lock() defer syncer.Unlock() actual := syncer.Registrations require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) - require.Equal("1.2.3.4", actual[0].Service.Address) - - // This is an odd case-- currently if there are multiple NodePorts configured - // for a service, we'll take the last one. - require.Equal(30001, actual[0].Service.Port) + require.Equal("4.5.6.7", actual[0].Service.Address) + require.Equal(30000, actual[0].Service.Port) require.Equal(node1, actual[0].Node) require.Equal("foo", actual[1].Service.Service) require.Equal("2.3.4.5", actual[1].Service.Address) - require.Equal(30001, actual[1].Service.Port) + require.Equal(30000, actual[1].Service.Port) require.Equal(node2, actual[1].Node) require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } @@ -994,6 +1530,80 @@ func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) { require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } +// Test that the proper registrations are generated for a ClusterIP type with unnamed ports. +func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + ClusterIPSync: true, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080)}, + apiv1.ServicePort{Port: 8500, TargetPort: intstr.FromInt(2000)}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(80, actual[0].Service.Port) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(80, actual[1].Service.Port) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + // Test that the ClusterIP services aren't synced when ClusterIPSync // is disabled. func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { diff --git a/subcommand/sync-catalog/command.go b/subcommand/sync-catalog/command.go index f549ca0a5239..3d1ba09192cf 100644 --- a/subcommand/sync-catalog/command.go +++ b/subcommand/sync-catalog/command.go @@ -39,6 +39,7 @@ type Command struct { flagK8SWriteNamespace string flagConsulWritePeriod flags.DurationValue flagSyncClusterIPServices bool + flagNodePortSyncType string once sync.Once help string @@ -73,6 +74,9 @@ func (c *Command) init() { c.flags.BoolVar(&c.flagSyncClusterIPServices, "sync-clusterip-services", true, "If true, all valid ClusterIP services in K8S are synced by default. If false, "+ "ClusterIP services are not synced to Consul.") + c.flags.StringVar(&c.flagNodePortSyncType, "node-port-sync-type", "ExternalOnly", + "Defines the type of sync for NodePort services. Valid options are ExternalOnly, "+ + "InternalOnly and ExternalFirst.") c.http = &flags.HTTPFlags{} c.k8s = &k8sflags.K8SFlags{} @@ -142,6 +146,7 @@ func (c *Command) Run(args []string) int { Namespace: c.flagK8SSourceNamespace, ExplicitEnable: !c.flagK8SDefault, ClusterIPSync: c.flagSyncClusterIPServices, + NodePortSync: catalogFromK8S.NodePortSyncType(c.flagNodePortSyncType), }, }