Skip to content

Commit

Permalink
Merge pull request hashicorp#45 from hashicorp/nodeport-syncing
Browse files Browse the repository at this point in the history
Nodeport syncing updates
  • Loading branch information
Rebecca Zanzig authored Jan 8, 2019
2 parents 3ad0005 + eca4b3b commit 947bc1a
Show file tree
Hide file tree
Showing 3 changed files with 800 additions and 66 deletions.
159 changes: 139 additions & 20 deletions catalog/from-k8s/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ const (
ConsulK8STag = "k8s"
)

type NodePortSyncType string

const (
// Only sync NodePort services with a node's ExternalIP address.
// Doesn't sync if an ExternalIP doesn't exist
ExternalOnly NodePortSyncType = "ExternalOnly"

// Sync with an ExternalIP first, if it doesn't exist, use the
// node's InternalIP address instead
ExternalFirst NodePortSyncType = "ExternalFirst"

// Sync NodePort services using
InternalOnly NodePortSyncType = "InternalOnly"
)

// ServiceResource implements controller.Resource to sync Service resource
// types from K8S.
type ServiceResource struct {
Expand All @@ -47,6 +62,11 @@ type ServiceResource struct {
// Setting this to false will ignore ClusterIP services during the sync.
ClusterIPSync bool

// NodeExternalIPSync set to true (the default) syncs NodePort services
// using the node's external ip address. When false, the node's internal
// ip address will be used instead.
NodePortSync NodePortSyncType

// serviceMap is a mapping of unique key (given by controller) to
// the service structure. endpointsMap is the mapping of the same
// uniqueKey to a set of endpoints.
Expand Down Expand Up @@ -252,35 +272,60 @@ func (t *ServiceResource) generateRegistrations(key string) {
baseService.Service = strings.TrimSpace(v)
}

// Determine the default port
// Determine the default port and set port annotations
if len(svc.Spec.Ports) > 0 {
nodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort
main := svc.Spec.Ports[0].Name
// Create port variable, defaults to 0
var port int

// Flag identifying whether the service is of NodePort type
isNodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort

// If a specific port is specified, then use that port value
if target, ok := svc.Annotations[annotationServicePort]; ok {
main = target
target, ok := svc.Annotations[annotationServicePort]
if ok {
if v, err := strconv.ParseInt(target, 0, 0); err == nil {
baseService.Port = int(v)
port = int(v)
}
}

// Go through the ports so we can add them to the service meta. We
// also use this opportunity to find our default port.
for _, p := range svc.Spec.Ports {
target := p.Port
if nodePort && p.NodePort > 0 {
target = p.NodePort
// For when the port was a name instead of an int
if port == 0 && target != "" {
// Find the named port
for _, p := range svc.Spec.Ports {
if p.Name == target {
// Pick the right port based on the type of service
if isNodePort && p.NodePort > 0 {
port = int(p.NodePort)
} else {
port = int(p.Port)
}
}
}
}

// Set the tag
baseService.Meta["port-"+p.Name] = strconv.FormatInt(int64(target), 10)

// If the name matches our main port, set our main port
if p.Name == main {
baseService.Port = int(target)
// If the port was not set above, set it with the first port
// based on the service type
if port == 0 && isNodePort {
// Find first defined NodePort
for _, p := range svc.Spec.Ports {
if p.NodePort > 0 {
port = int(p.NodePort)
break
}
}
}
if port == 0 && !isNodePort {
port = int(svc.Spec.Ports[0].Port)
}

// Set service port based on defined port
baseService.Port = port

// Add all the ports as annotations
for _, p := range svc.Spec.Ports {
// Set the tag
baseService.Meta["port-"+p.Name] = strconv.FormatInt(int64(p.Port), 10)
}
}

// Parse any additional tags
Expand Down Expand Up @@ -350,9 +395,83 @@ func (t *ServiceResource) generateRegistrations(key string) {
}

// For NodePort services, we create a service instance for each
// endpoint of the service. This way we don't register _every_ K8S
// endpoint of the service, which corresponds to the nodes the service's
// pods are running on. This way we don't register _every_ K8S
// node as part of the service.
case apiv1.ServiceTypeNodePort, apiv1.ServiceTypeClusterIP:
case apiv1.ServiceTypeNodePort:
if t.endpointsMap == nil {
return
}

endpoints := t.endpointsMap[key]
if endpoints == nil {
return
}

for _, subset := range endpoints.Subsets {
for _, subsetAddr := range subset.Addresses {
// Check that the node name exists
// subsetAddr.NodeName is of type *string
if subsetAddr.NodeName == nil {
continue
}

// Look up the node's ip address by getting node info
node, err := t.Client.CoreV1().Nodes().Get(*subsetAddr.NodeName, metav1.GetOptions{})
if err != nil {
t.Log.Warn("error getting node info", "error", err)
continue
}

// Set the expected node address type
var expectedType apiv1.NodeAddressType
if t.NodePortSync == InternalOnly {
expectedType = apiv1.NodeInternalIP
} else {
expectedType = apiv1.NodeExternalIP
}

// Find the ip address for the node and
// create the Consul service using it
var found bool
for _, address := range node.Status.Addresses {
if address.Type == expectedType {
found = true
r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, address.Address)
r.Service.Address = address.Address
r.Node = *subsetAddr.NodeName
r.Address = address.Address

t.consulMap[key] = append(t.consulMap[key], &r)
}
}

// If an ExternalIP wasn't found, and ExternalFirst is set,
// use an InternalIP
if t.NodePortSync == ExternalFirst && !found {
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeInternalIP {
r := baseNode
rs := baseService
r.Service = &rs
r.Service.ID = serviceID(r.Service.Service, address.Address)
r.Service.Address = address.Address
r.Node = *subsetAddr.NodeName
r.Address = address.Address

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
}
}
}

// For ClusterIP services, we register a service instance
// for each pod.
case apiv1.ServiceTypeClusterIP:
if t.endpointsMap == nil {
return
}
Expand Down
Loading

0 comments on commit 947bc1a

Please sign in to comment.