Skip to content

Commit

Permalink
catalog/from-k8s: adds support for ClusterIP Service
Browse files Browse the repository at this point in the history
  • Loading branch information
jipperinbham committed Oct 18, 2018
1 parent deafd3e commit b2f9ed5
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 2 deletions.
58 changes: 56 additions & 2 deletions catalog/from-k8s/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool {
return false
}

return svc.Spec.Type == apiv1.ServiceTypeNodePort
switch svc.Spec.Type {
case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP:
return true
}
return false
}

// generateRegistrations generates the necessary Consul registrations for
Expand Down Expand Up @@ -243,10 +247,11 @@ func (t *ServiceResource) generateRegistrations(key string) {
baseService.Service = strings.TrimSpace(v)
}

var main string
// Determine the default port
if len(svc.Spec.Ports) > 0 {
nodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort
main := svc.Spec.Ports[0].Name
main = svc.Spec.Ports[0].Name

// If a specific port is specified, then use that port value
if target, ok := svc.Annotations[annotationServicePort]; ok {
Expand Down Expand Up @@ -313,6 +318,55 @@ func (t *ServiceResource) generateRegistrations(key string) {
}

switch svc.Spec.Type {
case apiv1.ServiceTypeClusterIP:
if t.endpointsMap == nil {
return
}

endpoints := t.endpointsMap[key]
if endpoints == nil {
return
}

seen := map[string]struct{}{}
for _, subset := range endpoints.Subsets {
for _, subsetAddr := range subset.Addresses {
addr := subsetAddr.IP
if addr == "" {
addr = subsetAddr.Hostname
}
if addr == "" {
continue
}

// Its not clear whether K8S guarantees ready addresses to
// be unique so we maintain a set to prevent duplicates just
// in case.
if _, ok := seen[addr]; ok {
continue
}
seen[addr] = struct{}{}

for _, port := range subset.Ports {
if port.Name == main {
baseService.Port = int(port.Port)
break
}
}

r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, addr)
r.Service.Address = addr
if subsetAddr.NodeName != nil {
r.Node = *subsetAddr.NodeName
r.Address = addr
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
// For LoadBalancer type services, we create a service instance for
// each LoadBalancer entry. We only support entries that have an IP
// address assigned (not hostnames).
Expand Down
241 changes: 241 additions & 0 deletions catalog/from-k8s/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,247 @@ func TestServiceResource_nodePortInitial(t *testing.T) {
require.Equal("2.3.4.5", actual[1].Service.Address)
}

// Test that the proper registrations are generated for a ClusterIP type.
func TestServiceResource_clusterIP(t *testing.T) {
t.Parallel()
require := require.New(t)
client := fake.NewSimpleClientset()
syncer := &TestSyncer{}

// Start the controller
closer := controller.TestControllerRun(&ServiceResource{
Log: hclog.Default(),
Client: client,
Syncer: syncer,
})
defer closer()

// Insert the service
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},

Spec: apiv1.ServiceSpec{
Type: apiv1.ServiceTypeClusterIP,
Ports: []apiv1.ServicePort{
apiv1.ServicePort{Port: 80, TargetPort: intstr.FromInt(8080)},
},
},
})
require.NoError(err)

// Wait a bit
time.Sleep(300 * time.Millisecond)

// Insert the endpoints
_, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},

Subsets: []apiv1.EndpointSubset{
apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{IP: "1.2.3.4"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Port: 8080},
},
},

apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{IP: "2.3.4.5"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Port: 8080},
},
},
},
})
require.NoError(err)

// Wait a bit
time.Sleep(300 * time.Millisecond)

// Verify what we got
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(actual, 2)
require.Equal("foo", actual[0].Service.Service)
require.Equal("1.2.3.4", actual[0].Service.Address)
require.Equal(8080, actual[0].Service.Port)
require.Equal("foo", actual[1].Service.Service)
require.Equal("2.3.4.5", actual[1].Service.Address)
require.Equal(8080, actual[1].Service.Port)
require.NotEqual(actual[0].Service.ID, actual[1].Service.ID)
}

// Test that the proper registrations are generated for a ClusterIP type with multiple ports.
func TestServiceResource_clusterIPMultiEndpoint(t *testing.T) {
t.Parallel()
require := require.New(t)
client := fake.NewSimpleClientset()
syncer := &TestSyncer{}

// Start the controller
closer := controller.TestControllerRun(&ServiceResource{
Log: hclog.Default(),
Client: client,
Syncer: syncer,
})
defer closer()

// Insert the service
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},

Spec: apiv1.ServiceSpec{
Type: apiv1.ServiceTypeClusterIP,
Ports: []apiv1.ServicePort{
apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080)},
apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000)},
},
},
})
require.NoError(err)

// Wait a bit
time.Sleep(300 * time.Millisecond)

// Insert the endpoints
_, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},

Subsets: []apiv1.EndpointSubset{
apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{IP: "1.2.3.4"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
},

apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{IP: "2.3.4.5"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
},
},
})
require.NoError(err)

// Wait a bit
time.Sleep(300 * time.Millisecond)

// Verify what we got
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(actual, 2)
require.Equal("foo", actual[0].Service.Service)
require.Equal("1.2.3.4", actual[0].Service.Address)
require.Equal(8080, actual[0].Service.Port)
require.Equal("foo", actual[1].Service.Service)
require.Equal("2.3.4.5", actual[1].Service.Address)
require.Equal(8080, actual[1].Service.Port)
require.NotEqual(actual[0].Service.ID, actual[1].Service.ID)
}

// Test that the proper registrations are generated for a ClusterIP type with annotated override.
func TestServiceResource_clusterIPAnnotatedPort(t *testing.T) {
t.Parallel()
require := require.New(t)
client := fake.NewSimpleClientset()
syncer := &TestSyncer{}

// Start the controller
closer := controller.TestControllerRun(&ServiceResource{
Log: hclog.Default(),
Client: client,
Syncer: syncer,
})
defer closer()

// Insert the service
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(&apiv1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Annotations: map[string]string{annotationServicePort: "rpc"},
},

Spec: apiv1.ServiceSpec{
Type: apiv1.ServiceTypeClusterIP,
Ports: []apiv1.ServicePort{
apiv1.ServicePort{Name: "http", Port: 80, TargetPort: intstr.FromInt(8080)},
apiv1.ServicePort{Name: "rpc", Port: 8500, TargetPort: intstr.FromInt(2000)},
},
},
})
require.NoError(err)

// Wait a bit
time.Sleep(300 * time.Millisecond)

// Insert the endpoints
_, err = client.CoreV1().Endpoints(metav1.NamespaceDefault).Create(&apiv1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},

Subsets: []apiv1.EndpointSubset{
apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{IP: "1.2.3.4"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
},

apiv1.EndpointSubset{
Addresses: []apiv1.EndpointAddress{
apiv1.EndpointAddress{IP: "2.3.4.5"},
},
Ports: []apiv1.EndpointPort{
apiv1.EndpointPort{Name: "http", Port: 8080},
apiv1.EndpointPort{Name: "rpc", Port: 2000},
},
},
},
})
require.NoError(err)

// Wait a bit
time.Sleep(300 * time.Millisecond)

// Verify what we got
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(actual, 2)
require.Equal("foo", actual[0].Service.Service)
require.Equal("1.2.3.4", actual[0].Service.Address)
require.Equal(2000, actual[0].Service.Port)
require.Equal("foo", actual[1].Service.Service)
require.Equal("2.3.4.5", actual[1].Service.Address)
require.Equal(2000, actual[1].Service.Port)
require.NotEqual(actual[0].Service.ID, actual[1].Service.ID)
}

// testService returns a service that will result in a registration.
func testService(name string) *apiv1.Service {
return &apiv1.Service{
Expand Down

0 comments on commit b2f9ed5

Please sign in to comment.