From 3afc0338f136fbb2f18cbb5b011bfba08844799b Mon Sep 17 00:00:00 2001 From: Rebecca Zanzig Date: Tue, 18 Dec 2018 16:43:06 -0800 Subject: [PATCH 1/5] Sync nodeport services from each kubernetes node The original plan was to only sync NodePort service instances for the nodes they were running on. Currently, they are being synced with the pod's ip address instead, which is not guaranteed to be routable. For users running in this type of environment, this is causing these services to be reaped by the anti-entropy mechanism since their health checks fail due to the unroutability. Additionally, this provides the incorrect ip address for routing to NodePort services, which makes the sync ineffective. Given that there is not an easy way to get the node info from a service, the easiest solution to this is to sync all of the kubernetes nodes for the NodePort service. This has the benefit of distributing traffic as Kuberentes expects it for this type of service. --- catalog/from-k8s/resource.go | 33 +++++- catalog/from-k8s/resource_test.go | 160 ++++++++++++++---------------- 2 files changed, 106 insertions(+), 87 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index 508c7166bcee..f8ec10e23524 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -349,10 +349,37 @@ func (t *ServiceResource) generateRegistrations(key string) { t.consulMap[key] = append(t.consulMap[key], &r) } - // For NodePort services, we create a service instance for each - // endpoint of the service. This way we don't register _every_ K8S + // For NodePort services, we register each K8S // node as part of the service. - case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP: + case apiv1.ServiceTypeNodePort: + // Get all nodes to be able to reference their ip addresses + nodes, err := t.Client.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil || len(nodes.Items) == 0 { + t.Log.Warn("error getting nodes", "error", err) + return + } + + // Create a service instance for each node + for _, node := range nodes.Items { + for _, address := range node.Status.Addresses { + if address.Type == apiv1.NodeExternalIP { + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, address.Address) + r.Service.Address = address.Address + r.Address = address.Address + + if node.Name != "" { + r.Node = node.Name + } + + t.consulMap[key] = append(t.consulMap[key], &r) + } + } + } + + case apiv1.ServiceTypeClusterIP: if t.endpointsMap == nil { return } diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index 7fcef471b3a2..23348468a1eb 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -529,52 +529,50 @@ func TestServiceResource_nodePort(t *testing.T) { }) defer closer() - // Insert the service - _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + Name: node1, }, - Spec: apiv1.ServiceSpec{ - Type: apiv1.ServiceTypeNodePort, - Ports: []apiv1.ServicePort{ - apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, - apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, }, }, }) require.NoError(err) + require.NoError(err) // Wait a bit time.Sleep(300 * time.Millisecond) - node1 := "ip-10-11-12-13.ec2.internal" - node2 := "ip-10-11-12-14.ec2.internal" - // Insert the endpoints - _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", }, - Subsets: []apiv1.EndpointSubset{ - apiv1.EndpointSubset{ - Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, - }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, - }, - - apiv1.EndpointSubset{ - Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, - }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, }, }, }) @@ -618,31 +616,28 @@ func TestServiceResource_nodePortInitial(t *testing.T) { node1 := "ip-10-11-12-13.ec2.internal" node2 := "ip-10-11-12-14.ec2.internal" - // Insert the endpoints - _, err := client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + Name: node1, }, - Subsets: []apiv1.EndpointSubset{ - apiv1.EndpointSubset{ - Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, - }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, }, + }, + }) + require.NoError(err) - apiv1.EndpointSubset{ - Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, - }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, }, }, }) @@ -698,18 +693,30 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { }) defer closer() - // Insert the service - _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Annotations: map[string]string{annotationServicePort: "rpc"}, + Name: node1, }, - Spec: apiv1.ServiceSpec{ - Type: apiv1.ServiceTypeNodePort, - Ports: []apiv1.ServicePort{ - apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, - apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, }, }, }) @@ -718,33 +725,18 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { // Wait a bit time.Sleep(300 * time.Millisecond) - node1 := "ip-10-11-12-13.ec2.internal" - node2 := "ip-10-11-12-14.ec2.internal" - // Insert the endpoints - _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", + Name: "foo", + Annotations: map[string]string{annotationServicePort: "rpc"}, }, - Subsets: []apiv1.EndpointSubset{ - apiv1.EndpointSubset{ - Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, - }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, - }, - - apiv1.EndpointSubset{ - Addresses: []apiv1.EndpointAddress{ - apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, - }, - Ports: []apiv1.EndpointPort{ - apiv1.EndpointPort{Name: "http", Port: 8080}, - apiv1.EndpointPort{Name: "rpc", Port: 2000}, - }, + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, }, }, }) From 39d78df4e91f15eb4402aa20bf071a5e975d33f8 Mon Sep 17 00:00:00 2001 From: Rebecca Zanzig Date: Wed, 19 Dec 2018 14:32:45 -0800 Subject: [PATCH 2/5] Sync only nodes where the NodePort is running It *is* possible to find the node from an endpoint, so this turns into a straight up bug fix to match the intended design. It gets the routable node info into the sync, allowing folks to use it for external connections. This should also fix the anti-entropy issue that was removing NodePort services because of the nodes failing health checks. --- catalog/from-k8s/resource.go | 59 ++++++--- catalog/from-k8s/resource_test.go | 194 ++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+), 20 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index f8ec10e23524..74da4248a4d7 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -349,36 +349,55 @@ func (t *ServiceResource) generateRegistrations(key string) { t.consulMap[key] = append(t.consulMap[key], &r) } - // For NodePort services, we register each K8S + // For NodePort services, we create a service instance for each + // endpoint of the service, which corresponds to the nodes the service's + // pods are running on. This way we don't register _every_ K8S // node as part of the service. case apiv1.ServiceTypeNodePort: - // Get all nodes to be able to reference their ip addresses - nodes, err := t.Client.CoreV1().Nodes().List(metav1.ListOptions{}) - if err != nil || len(nodes.Items) == 0 { - t.Log.Warn("error getting nodes", "error", err) + if t.endpointsMap == nil { return } - // Create a service instance for each node - for _, node := range nodes.Items { - for _, address := range node.Status.Addresses { - if address.Type == apiv1.NodeExternalIP { - r := baseNode - rs := baseService - r.Service = &rs - r.Service.ID = serviceID(r.Service.Service, address.Address) - r.Service.Address = address.Address - r.Address = address.Address - - if node.Name != "" { - r.Node = node.Name - } + endpoints := t.endpointsMap[key] + if endpoints == nil { + return + } - t.consulMap[key] = append(t.consulMap[key], &r) + for _, subset := range endpoints.Subsets { + for _, subsetAddr := range subset.Addresses { + // Check that the node name exists + // subsetAddr.NodeName is of type *string + if subsetAddr.NodeName == nil { + continue + } + + // Look up the node's ip address by getting node info + node, err := t.Client.CoreV1().Nodes().Get(*subsetAddr.NodeName, metav1.GetOptions{}) + if err != nil { + t.Log.Warn("error getting node info", "error", err) + continue + } + + // Find the external ip address for the node and + // create the Consul service using it + for _, address := range node.Status.Addresses { + if address.Type == apiv1.NodeExternalIP { + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, address.Address) + r.Service.Address = address.Address + r.Node = *subsetAddr.NodeName + r.Address = address.Address + + t.consulMap[key] = append(t.consulMap[key], &r) + } } } } + // For ClusterIP services, we register a service instance + // for each pod. case apiv1.ServiceTypeClusterIP: if t.endpointsMap == nil { return diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index 23348468a1eb..34e3d44e1af8 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -557,6 +557,36 @@ func TestServiceResource_nodePort(t *testing.T) { }, }) require.NoError(err) + time.Sleep(200 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) require.NoError(err) // Wait a bit @@ -597,6 +627,104 @@ func TestServiceResource_nodePort(t *testing.T) { require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } +// Test that the proper registrations are generated for a NodePort type. +func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + }, + }, + }) + require.NoError(err) + time.Sleep(200 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 1) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(30000, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) +} + // Test that a NodePort created earlier works (doesn't require an Endpoints // update event). func TestServiceResource_nodePortInitial(t *testing.T) { @@ -644,6 +772,39 @@ func TestServiceResource_nodePortInitial(t *testing.T) { require.NoError(err) time.Sleep(200 * time.Millisecond) + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + // Insert the service _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -725,6 +886,39 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { // Wait a bit time.Sleep(300 * time.Millisecond) + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + // Insert the service _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ From ee37b63438d0f0c514f84a98ccb27c1d696a43c9 Mon Sep 17 00:00:00 2001 From: Rebecca Zanzig Date: Wed, 19 Dec 2018 17:17:05 -0800 Subject: [PATCH 3/5] Fix NodePort port selection when multiple NodePorts are defined If more than one NodePort was defined for a service, rather than taking the first, the last was taken instead. This update takes the first as expected. It additionally now takes the first port when using unnamed ports, as well as adds tests for unnamed ports for both NodePort and ClusterIP type services. --- catalog/from-k8s/resource.go | 61 +++++++--- catalog/from-k8s/resource_test.go | 192 +++++++++++++++++++++++++++++- 2 files changed, 232 insertions(+), 21 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index 74da4248a4d7..66a9e2354ab6 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -252,35 +252,60 @@ func (t *ServiceResource) generateRegistrations(key string) { baseService.Service = strings.TrimSpace(v) } - // Determine the default port + // Determine the default port and set port annotations if len(svc.Spec.Ports) > 0 { - nodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort - main := svc.Spec.Ports[0].Name + // Create port variable, defaults to 0 + var port int + + // Flag identifying whether the service is of NodePort type + isNodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort // If a specific port is specified, then use that port value - if target, ok := svc.Annotations[annotationServicePort]; ok { - main = target + target, ok := svc.Annotations[annotationServicePort] + if ok { if v, err := strconv.ParseInt(target, 0, 0); err == nil { - baseService.Port = int(v) + port = int(v) } } - // Go through the ports so we can add them to the service meta. We - // also use this opportunity to find our default port. - for _, p := range svc.Spec.Ports { - target := p.Port - if nodePort && p.NodePort > 0 { - target = p.NodePort + // For when the port was a name instead of an int + if port == 0 && target != "" { + // Find the named port + for _, p := range svc.Spec.Ports { + if p.Name == target { + // Pick the right port based on the type of service + if isNodePort && p.NodePort > 0 { + port = int(p.NodePort) + } else { + port = int(p.Port) + } + } } + } - // Set the tag - baseService.Meta["port-"+p.Name] = strconv.FormatInt(int64(target), 10) - - // If the name matches our main port, set our main port - if p.Name == main { - baseService.Port = int(target) + // If the port was not set above, set it with the first port + // based on the service type + if port == 0 && isNodePort { + // Find first defined NodePort + for _, p := range svc.Spec.Ports { + if p.NodePort > 0 { + port = int(p.NodePort) + break + } } } + if port == 0 && !isNodePort { + port = int(svc.Spec.Ports[0].Port) + } + + // Set service port based on defined port + baseService.Port = port + + // Add all the ports as annotations + for _, p := range svc.Spec.Ports { + // Set the tag + baseService.Meta["port-"+p.Name] = strconv.FormatInt(int64(p.Port), 10) + } } // Parse any additional tags diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index 34e3d44e1af8..e4ff9522995d 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -946,9 +946,6 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { require.Len(actual, 2) require.Equal("foo", actual[0].Service.Service) require.Equal("1.2.3.4", actual[0].Service.Address) - - // This is an odd case-- currently if there are multiple NodePorts configured - // for a service, we'll take the last one. require.Equal(30001, actual[0].Service.Port) require.Equal(node1, actual[0].Node) require.Equal("foo", actual[1].Service.Service) @@ -958,6 +955,121 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } +// Test that the proper registrations are generated for a NodePort with annotated port. +func TestServiceResource_nodePortUnnamedPort(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + }) + defer closer() + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Port: 8080}, + apiv1.EndpointPort{Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Port: 8080}, + apiv1.EndpointPort{Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(30000, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(30000, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + // Test that the proper registrations are generated for a ClusterIP type. func TestServiceResource_clusterIP(t *testing.T) { t.Parallel() @@ -1180,6 +1292,80 @@ func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) { require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } +// Test that the proper registrations are generated for a ClusterIP type with unnamed ports. +func TestServiceResource_clusterIPUnnamedPorts(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + ClusterIPSync: true, + }) + defer closer() + + // Insert the service + _, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeClusterIP, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080)}, + apiv1.ServicePort{Port: 8500, TargetPort: intstr.FromInt(2000)}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "1.2.3.4"}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{IP: "2.3.4.5"}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("1.2.3.4", actual[0].Service.Address) + require.Equal(80, actual[0].Service.Port) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(80, actual[1].Service.Port) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + // Test that the ClusterIP services aren't synced when ClusterIPSync // is disabled. func TestServiceResource_clusterIPSyncDisabled(t *testing.T) { From b32385d1c761f525615227b4d868705d12640fcc Mon Sep 17 00:00:00 2001 From: Rebecca Zanzig Date: Mon, 7 Jan 2019 15:03:55 -0800 Subject: [PATCH 4/5] Add a flag to specify using a node's internal or external ip address Some users may not be exposing any external ip addresses, or may prefer to have all the routing happen within their network. This allows users to choose whether to sync NodePort services with either the node's External or Internal IP address. --- catalog/from-k8s/resource.go | 17 +++- catalog/from-k8s/resource_test.go | 153 ++++++++++++++++++++++++++--- subcommand/sync-catalog/command.go | 17 ++-- 3 files changed, 164 insertions(+), 23 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index 66a9e2354ab6..7a4eac2fefab 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -47,6 +47,11 @@ type ServiceResource struct { // Setting this to false will ignore ClusterIP services during the sync. ClusterIPSync bool + // NodeExternalIPSync set to true (the default) syncs NodePort services + // using the node's external ip address. When false, the node's internal + // ip address will be used instead. + NodeExternalIPSync bool + // serviceMap is a mapping of unique key (given by controller) to // the service structure. endpointsMap is the mapping of the same // uniqueKey to a set of endpoints. @@ -403,10 +408,18 @@ func (t *ServiceResource) generateRegistrations(key string) { continue } - // Find the external ip address for the node and + // Set the expected node address type + var expectedType apiv1.NodeAddressType + if t.NodeExternalIPSync { + expectedType = apiv1.NodeExternalIP + } else { + expectedType = apiv1.NodeInternalIP + } + + // Find the ip address for the node and // create the Consul service using it for _, address := range node.Status.Addresses { - if address.Type == apiv1.NodeExternalIP { + if address.Type == expectedType { r := baseNode rs := baseService r.Service = &rs diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index e4ff9522995d..274d1d2c2c4c 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -523,9 +523,10 @@ func TestServiceResource_nodePort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodeExternalIPSync: true, }) defer closer() @@ -540,6 +541,7 @@ func TestServiceResource_nodePort(t *testing.T) { Status: apiv1.NodeStatus{ Addresses: []apiv1.NodeAddress{ apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "4.5.6.7"}, }, }, }) @@ -553,6 +555,7 @@ func TestServiceResource_nodePort(t *testing.T) { Status: apiv1.NodeStatus{ Addresses: []apiv1.NodeAddress{ apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "3.4.5.6"}, }, }, }) @@ -636,9 +639,10 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodeExternalIPSync: true, }) defer closer() @@ -735,9 +739,10 @@ func TestServiceResource_nodePortInitial(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodeExternalIPSync: true, }) defer closer() time.Sleep(100 * time.Millisecond) @@ -848,9 +853,10 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodeExternalIPSync: true, }) defer closer() @@ -964,9 +970,10 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodeExternalIPSync: true, }) defer closer() @@ -1070,6 +1077,122 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } +// Test that the proper registrations are generated for a NodePort type. +func TestServiceResource_nodePort_internalIP(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodeExternalIPSync: false, + }) + defer closer() + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "1.2.3.4"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "4.5.6.7"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "3.4.5.6"}, + }, + }, + }) + require.NoError(err) + time.Sleep(200 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("4.5.6.7", actual[0].Service.Address) + require.Equal(30000, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) + require.Equal("foo", actual[1].Service.Service) + require.Equal("3.4.5.6", actual[1].Service.Address) + require.Equal(30000, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + // Test that the proper registrations are generated for a ClusterIP type. func TestServiceResource_clusterIP(t *testing.T) { t.Parallel() diff --git a/subcommand/sync-catalog/command.go b/subcommand/sync-catalog/command.go index f549ca0a5239..07b8cc0a4514 100644 --- a/subcommand/sync-catalog/command.go +++ b/subcommand/sync-catalog/command.go @@ -39,6 +39,7 @@ type Command struct { flagK8SWriteNamespace string flagConsulWritePeriod flags.DurationValue flagSyncClusterIPServices bool + flagSyncNodeExternalIP bool once sync.Once help string @@ -73,6 +74,9 @@ func (c *Command) init() { c.flags.BoolVar(&c.flagSyncClusterIPServices, "sync-clusterip-services", true, "If true, all valid ClusterIP services in K8S are synced by default. If false, "+ "ClusterIP services are not synced to Consul.") + c.flags.BoolVar(&c.flagSyncNodeExternalIP, "sync-node-external-ip", true, + "If true, NodePort services will be synced using the node's external ip address. "+ + "If false, NodePort services will be synced with the node's internal ip address.") c.http = &flags.HTTPFlags{} c.k8s = &k8sflags.K8SFlags{} @@ -136,12 +140,13 @@ func (c *Command) Run(args []string) int { ctl := &controller.Controller{ Log: hclog.Default().Named("to-consul/controller"), Resource: &catalogFromK8S.ServiceResource{ - Log: hclog.Default().Named("to-consul/source"), - Client: clientset, - Syncer: syncer, - Namespace: c.flagK8SSourceNamespace, - ExplicitEnable: !c.flagK8SDefault, - ClusterIPSync: c.flagSyncClusterIPServices, + Log: hclog.Default().Named("to-consul/source"), + Client: clientset, + Syncer: syncer, + Namespace: c.flagK8SSourceNamespace, + ExplicitEnable: !c.flagK8SDefault, + ClusterIPSync: c.flagSyncClusterIPServices, + NodeExternalIPSync: c.flagSyncNodeExternalIP, }, } From eca4b3b5dc660ba399a99d0f7b5468f346a19ee6 Mon Sep 17 00:00:00 2001 From: Rebecca Zanzig Date: Tue, 8 Jan 2019 11:21:39 -0800 Subject: [PATCH 5/5] Switch the sync to an enum instead of a boolean option This allows for a variety of sync options with NodePort. At this time, ExternalOnly, InternalOnly and ExternalFirst are implemented options. The first two limit the node ip's address to either the ExternalIP or the InternalIP. The third option uses the node's ExternalIP if it exists. If not, it instead uses the node's InternalIP. --- catalog/from-k8s/resource.go | 43 +++++++- catalog/from-k8s/resource_test.go | 165 ++++++++++++++++++++++++----- subcommand/sync-catalog/command.go | 22 ++-- 3 files changed, 190 insertions(+), 40 deletions(-) diff --git a/catalog/from-k8s/resource.go b/catalog/from-k8s/resource.go index 7a4eac2fefab..36bdef6655df 100644 --- a/catalog/from-k8s/resource.go +++ b/catalog/from-k8s/resource.go @@ -30,6 +30,21 @@ const ( ConsulK8STag = "k8s" ) +type NodePortSyncType string + +const ( + // Only sync NodePort services with a node's ExternalIP address. + // Doesn't sync if an ExternalIP doesn't exist + ExternalOnly NodePortSyncType = "ExternalOnly" + + // Sync with an ExternalIP first, if it doesn't exist, use the + // node's InternalIP address instead + ExternalFirst NodePortSyncType = "ExternalFirst" + + // Sync NodePort services using + InternalOnly NodePortSyncType = "InternalOnly" +) + // ServiceResource implements controller.Resource to sync Service resource // types from K8S. type ServiceResource struct { @@ -50,7 +65,7 @@ type ServiceResource struct { // NodeExternalIPSync set to true (the default) syncs NodePort services // using the node's external ip address. When false, the node's internal // ip address will be used instead. - NodeExternalIPSync bool + NodePortSync NodePortSyncType // serviceMap is a mapping of unique key (given by controller) to // the service structure. endpointsMap is the mapping of the same @@ -410,16 +425,18 @@ func (t *ServiceResource) generateRegistrations(key string) { // Set the expected node address type var expectedType apiv1.NodeAddressType - if t.NodeExternalIPSync { - expectedType = apiv1.NodeExternalIP - } else { + if t.NodePortSync == InternalOnly { expectedType = apiv1.NodeInternalIP + } else { + expectedType = apiv1.NodeExternalIP } // Find the ip address for the node and // create the Consul service using it + var found bool for _, address := range node.Status.Addresses { if address.Type == expectedType { + found = true r := baseNode rs := baseService r.Service = &rs @@ -431,6 +448,24 @@ func (t *ServiceResource) generateRegistrations(key string) { t.consulMap[key] = append(t.consulMap[key], &r) } } + + // If an ExternalIP wasn't found, and ExternalFirst is set, + // use an InternalIP + if t.NodePortSync == ExternalFirst && !found { + for _, address := range node.Status.Addresses { + if address.Type == apiv1.NodeInternalIP { + r := baseNode + rs := baseService + r.Service = &rs + r.Service.ID = serviceID(r.Service.Service, address.Address) + r.Service.Address = address.Address + r.Node = *subsetAddr.NodeName + r.Address = address.Address + + t.consulMap[key] = append(t.consulMap[key], &r) + } + } + } } } diff --git a/catalog/from-k8s/resource_test.go b/catalog/from-k8s/resource_test.go index 274d1d2c2c4c..43b2d4c51cf5 100644 --- a/catalog/from-k8s/resource_test.go +++ b/catalog/from-k8s/resource_test.go @@ -523,10 +523,10 @@ func TestServiceResource_nodePort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, - NodeExternalIPSync: true, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, }) defer closer() @@ -639,10 +639,10 @@ func TestServiceResource_nodePort_singleEndpoint(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, - NodeExternalIPSync: true, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, }) defer closer() @@ -739,10 +739,10 @@ func TestServiceResource_nodePortInitial(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, - NodeExternalIPSync: true, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, }) defer closer() time.Sleep(100 * time.Millisecond) @@ -853,10 +853,10 @@ func TestServiceResource_nodePortAnnotatedPort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, - NodeExternalIPSync: true, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, }) defer closer() @@ -970,10 +970,10 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, - NodeExternalIPSync: true, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalOnly, }) defer closer() @@ -1078,7 +1078,7 @@ func TestServiceResource_nodePortUnnamedPort(t *testing.T) { } // Test that the proper registrations are generated for a NodePort type. -func TestServiceResource_nodePort_internalIP(t *testing.T) { +func TestServiceResource_nodePort_internalOnlySync(t *testing.T) { t.Parallel() require := require.New(t) client := fake.NewSimpleClientset() @@ -1086,10 +1086,10 @@ func TestServiceResource_nodePort_internalIP(t *testing.T) { // Start the controller closer := controller.TestControllerRun(&ServiceResource{ - Log: hclog.Default(), - Client: client, - Syncer: syncer, - NodeExternalIPSync: false, + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: InternalOnly, }) defer closer() @@ -1193,6 +1193,121 @@ func TestServiceResource_nodePort_internalIP(t *testing.T) { require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) } +// Test that the proper registrations are generated for a NodePort type. +func TestServiceResource_nodePort_externalFirstSync(t *testing.T) { + t.Parallel() + require := require.New(t) + client := fake.NewSimpleClientset() + syncer := &TestSyncer{} + + // Start the controller + closer := controller.TestControllerRun(&ServiceResource{ + Log: hclog.Default(), + Client: client, + Syncer: syncer, + NodePortSync: ExternalFirst, + }) + defer closer() + + node1 := "ip-10-11-12-13.ec2.internal" + node2 := "ip-10-11-12-14.ec2.internal" + // Insert the nodes + _, err := client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node1, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "4.5.6.7"}, + }, + }, + }) + require.NoError(err) + + _, err = client.CoreV1().Nodes().Create(&apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node2, + }, + + Status: apiv1.NodeStatus{ + Addresses: []apiv1.NodeAddress{ + apiv1.NodeAddress{Type: apiv1.NodeExternalIP, Address: "2.3.4.5"}, + apiv1.NodeAddress{Type: apiv1.NodeInternalIP, Address: "3.4.5.6"}, + }, + }, + }) + require.NoError(err) + time.Sleep(200 * time.Millisecond) + + // Insert the endpoints + _, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Subsets: []apiv1.EndpointSubset{ + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node1, IP: "1.2.3.4"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + + apiv1.EndpointSubset{ + Addresses: []apiv1.EndpointAddress{ + apiv1.EndpointAddress{NodeName: &node2, IP: "2.3.4.5"}, + }, + Ports: []apiv1.EndpointPort{ + apiv1.EndpointPort{Name: "http", Port: 8080}, + apiv1.EndpointPort{Name: "rpc", Port: 2000}, + }, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Insert the service + _, err = client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeNodePort, + Ports: []apiv1.ServicePort{ + apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080), NodePort: 30000}, + apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000), NodePort: 30001}, + }, + }, + }) + require.NoError(err) + + // Wait a bit + time.Sleep(300 * time.Millisecond) + + // Verify what we got + syncer.Lock() + defer syncer.Unlock() + actual := syncer.Registrations + require.Len(actual, 2) + require.Equal("foo", actual[0].Service.Service) + require.Equal("4.5.6.7", actual[0].Service.Address) + require.Equal(30000, actual[0].Service.Port) + require.Equal(node1, actual[0].Node) + require.Equal("foo", actual[1].Service.Service) + require.Equal("2.3.4.5", actual[1].Service.Address) + require.Equal(30000, actual[1].Service.Port) + require.Equal(node2, actual[1].Node) + require.NotEqual(actual[0].Service.ID, actual[1].Service.ID) +} + // Test that the proper registrations are generated for a ClusterIP type. func TestServiceResource_clusterIP(t *testing.T) { t.Parallel() diff --git a/subcommand/sync-catalog/command.go b/subcommand/sync-catalog/command.go index 07b8cc0a4514..3d1ba09192cf 100644 --- a/subcommand/sync-catalog/command.go +++ b/subcommand/sync-catalog/command.go @@ -39,7 +39,7 @@ type Command struct { flagK8SWriteNamespace string flagConsulWritePeriod flags.DurationValue flagSyncClusterIPServices bool - flagSyncNodeExternalIP bool + flagNodePortSyncType string once sync.Once help string @@ -74,9 +74,9 @@ func (c *Command) init() { c.flags.BoolVar(&c.flagSyncClusterIPServices, "sync-clusterip-services", true, "If true, all valid ClusterIP services in K8S are synced by default. If false, "+ "ClusterIP services are not synced to Consul.") - c.flags.BoolVar(&c.flagSyncNodeExternalIP, "sync-node-external-ip", true, - "If true, NodePort services will be synced using the node's external ip address. "+ - "If false, NodePort services will be synced with the node's internal ip address.") + c.flags.StringVar(&c.flagNodePortSyncType, "node-port-sync-type", "ExternalOnly", + "Defines the type of sync for NodePort services. Valid options are ExternalOnly, "+ + "InternalOnly and ExternalFirst.") c.http = &flags.HTTPFlags{} c.k8s = &k8sflags.K8SFlags{} @@ -140,13 +140,13 @@ func (c *Command) Run(args []string) int { ctl := &controller.Controller{ Log: hclog.Default().Named("to-consul/controller"), Resource: &catalogFromK8S.ServiceResource{ - Log: hclog.Default().Named("to-consul/source"), - Client: clientset, - Syncer: syncer, - Namespace: c.flagK8SSourceNamespace, - ExplicitEnable: !c.flagK8SDefault, - ClusterIPSync: c.flagSyncClusterIPServices, - NodeExternalIPSync: c.flagSyncNodeExternalIP, + Log: hclog.Default().Named("to-consul/source"), + Client: clientset, + Syncer: syncer, + Namespace: c.flagK8SSourceNamespace, + ExplicitEnable: !c.flagK8SDefault, + ClusterIPSync: c.flagSyncClusterIPServices, + NodePortSync: catalogFromK8S.NodePortSyncType(c.flagNodePortSyncType), }, }