Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nit fixes #358

Merged
merged 2 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
func (c *Controller) IsHealthy() error {
// check if last seen service and endpoint processing is more than an hour ago
if c.syncTracker.Get().Before(time.Now().Add(-time.Hour)) {
msg := fmt.Sprintf("NEG controller has not proccessed any service and endpoint updates for more than an hour. Something went wrong. Last sync was on %v", c.syncTracker.Get())
msg := fmt.Sprintf("NEG controller has not processed any service "+
"and endpoint updates for more than an hour. Something went wrong. "+
"Last sync was on %v", c.syncTracker.Get())
glog.Error(msg)
return fmt.Errorf(msg)
}
Expand Down Expand Up @@ -240,7 +242,7 @@ func (c *Controller) processService(key string) error {
if !enabled {
c.manager.StopSyncer(namespace, name)
// delete the annotation
return c.syncNegAnnotation(namespace, name, service, make(PortNameMap))
return c.syncNegStatusAnnotation(namespace, name, service, make(PortNameMap))
}

glog.V(2).Infof("Syncing service %q", key)
Expand Down Expand Up @@ -272,19 +274,20 @@ func (c *Controller) processService(key string) error {
svcPortMap = svcPortMap.Union(negSvcPorts)
}

err = c.syncNegAnnotation(namespace, name, service, svcPortMap)
err = c.syncNegStatusAnnotation(namespace, name, service, svcPortMap)
if err != nil {
return err
}
return c.manager.EnsureSyncers(namespace, name, svcPortMap)
}

func (c *Controller) syncNegAnnotation(namespace, name string, service *apiv1.Service, portMap PortNameMap) error {
func (c *Controller) syncNegStatusAnnotation(namespace, name string, service *apiv1.Service, portMap PortNameMap) error {
zones, err := c.zoneGetter.ListZones()
if err != nil {
return err
}

// Remove NEG Status Annotation when no NEG is needed
if len(portMap) == 0 {
if _, ok := service.Annotations[annotations.NEGStatusKey]; ok {
// TODO: use PATCH to remove annotation
Expand All @@ -297,24 +300,23 @@ func (c *Controller) syncNegAnnotation(namespace, name string, service *apiv1.Se
}

portToNegs := make(PortNameMap)
for svcPort, _ := range portMap {
for svcPort := range portMap {
portToNegs[svcPort] = c.namer.NEG(namespace, name, svcPort)
}
negSvcState := GetNegStatus(zones, portToNegs)
formattedAnnotation, err := json.Marshal(negSvcState)
bytes, err := json.Marshal(negSvcState)
if err != nil {
return err
}

annotation := string(formattedAnnotation)

annotation := string(bytes)
existingAnnotation, ok := service.Annotations[annotations.NEGStatusKey]
if ok && existingAnnotation == annotation {
return nil
}

service.Annotations[annotations.NEGStatusKey] = annotation
glog.V(2).Infof("Updating NEG visibility annotation %q on service %s/%s.", annotation, namespace, name)
// TODO: use PATCH to Update Annotation
return c.serviceLister.Update(service)
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,20 +368,20 @@ func TestSyncNegAnnotation(t *testing.T) {
t.Fatalf("Service was not retrieved successfully, err: %v", err)
}

controller.syncNegAnnotation(testServiceNamespace, testServiceName, svc.(*apiv1.Service), tc.previousPortMap)
controller.syncNegStatusAnnotation(testServiceNamespace, testServiceName, svc.(*apiv1.Service), tc.previousPortMap)
svc, _, _ = controller.serviceLister.GetByKey(svcKey)

var oldSvcPorts []int32
for port, _ := range tc.previousPortMap {
for port := range tc.previousPortMap {
oldSvcPorts = append(oldSvcPorts, port)
}
validateServiceStateAnnotation(t, svc.(*apiv1.Service), oldSvcPorts)

controller.syncNegAnnotation(testServiceNamespace, testServiceName, svc.(*apiv1.Service), tc.portMap)
controller.syncNegStatusAnnotation(testServiceNamespace, testServiceName, svc.(*apiv1.Service), tc.portMap)
svc, _, _ = controller.serviceLister.GetByKey(svcKey)

var svcPorts []int32
for port, _ := range tc.portMap {
for port := range tc.portMap {
svcPorts = append(svcPorts, port)
}
validateServiceStateAnnotation(t, svc.(*apiv1.Service), svcPorts)
Expand Down
30 changes: 17 additions & 13 deletions pkg/neg/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (s *syncer) init() {
// Start starts the syncer go routine if it has not been started.
func (s *syncer) Start() error {
if !s.IsStopped() {
return fmt.Errorf("NEG syncer for %s/%s-%v/%s is already running.", s.namespace, s.name, s.port, s.targetPort)
return fmt.Errorf("NEG syncer for %s is already running.", s.formattedName())
}
if s.IsShuttingDown() {
return fmt.Errorf("NEG syncer for %s/%s-%v/%s is shutting down. ", s.namespace, s.name, s.port, s.targetPort)
return fmt.Errorf("NEG syncer for %s is shutting down. ", s.formattedName())
}

glog.V(2).Infof("Starting NEG syncer for service port %s/%s-%v/%s", s.namespace, s.name, s.port, s.targetPort)
glog.V(2).Infof("Starting NEG syncer for service port %s", s.formattedName())
s.init()
go func() {
for {
Expand Down Expand Up @@ -136,7 +136,7 @@ func (s *syncer) Start() error {
s.stateLock.Lock()
s.shuttingDown = false
s.stateLock.Unlock()
glog.V(2).Infof("Stopping NEG syncer for %s/%s-%v-%s", s.namespace, s.name, s.port, s.targetPort)
glog.V(2).Infof("Stopping NEG syncer for %s", s.formattedName())
return
}
case <-retryCh:
Expand All @@ -152,7 +152,7 @@ func (s *syncer) Stop() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
if !s.stopped {
glog.V(2).Infof("Stopping NEG syncer for service port %s/%s-%v/%s", s.namespace, s.name, s.port, s.targetPort)
glog.V(2).Infof("Stopping NEG syncer for service port %s", s.formattedName())
s.stopped = true
s.shuttingDown = true
close(s.syncCh)
Expand All @@ -162,7 +162,7 @@ func (s *syncer) Stop() {
// Sync informs syncer to run sync loop as soon as possible.
func (s *syncer) Sync() bool {
if s.IsStopped() {
glog.Warningf("NEG syncer for %s/%s-%s is already stopped.", s.namespace, s.name, s.targetPort)
glog.Warningf("NEG syncer for %s is already stopped.", s.formattedName())
return false
}
select {
Expand All @@ -187,10 +187,10 @@ func (s *syncer) IsShuttingDown() bool {

func (s *syncer) sync() (err error) {
if s.IsStopped() || s.IsShuttingDown() {
glog.V(4).Infof("Skip syncing NEG %q for %s/%s-%s.", s.negName, s.namespace, s.name, s.targetPort)
glog.V(4).Infof("Skip syncing NEG %q for %s.", s.negName, s.formattedName())
return nil
}
glog.V(2).Infof("Sync NEG %q for %s/%s-%s", s.negName, s.namespace, s.name, s.targetPort)
glog.V(2).Infof("Sync NEG %q for %s.", s.negName, s.formattedName())
start := time.Now()
defer observeNegSync(s.negName, attachSync, err, start)
ep, exists, err := s.endpointLister.Get(
Expand Down Expand Up @@ -263,13 +263,13 @@ func (s *syncer) ensureNetworkEndpointGroups() error {
errList = append(errList, err)
} else {
if svc := getService(s.serviceLister, s.namespace, s.name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Delete", "Deleted NEG %q for %s/%s-%s in %q.", s.negName, s.namespace, s.name, s.targetPort, zone)
s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Delete", "Deleted NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
}
}
}

if needToCreate {
glog.V(2).Infof("Creating NEG %q for %s/%s in %q.", s.negName, s.namespace, s.name, zone)
glog.V(2).Infof("Creating NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
err = s.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: s.negName,
Type: gce.NEGLoadBalancerType,
Expand All @@ -283,7 +283,7 @@ func (s *syncer) ensureNetworkEndpointGroups() error {
errList = append(errList, err)
} else {
if svc := getService(s.serviceLister, s.namespace, s.name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Create", "Created NEG %q for %s/%s-%s in %q.", s.negName, s.namespace, s.name, s.targetPort, zone)
s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Create", "Created NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
}
}
}
Expand Down Expand Up @@ -436,13 +436,13 @@ func (s *syncer) toNetworkEndpointBatch(endpoints sets.String) ([]*compute.Netwo

func (s *syncer) attachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) {
wg.Add(1)
glog.V(2).Infof("Attaching %d endpoint(s) for %s/%s-%s into NEG %s in %s.", len(networkEndpoints), s.namespace, s.name, s.targetPort, s.negName, zone)
glog.V(2).Infof("Attaching %d endpoint(s) for %s into NEG %s in %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.AttachNetworkEndpoints, "Attach")
}

func (s *syncer) detachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) {
wg.Add(1)
glog.V(2).Infof("Detaching %d endpoint(s) for %s/%s-%s into NEG %s in %s.", len(networkEndpoints), s.namespace, s.name, s.targetPort, s.negName, zone)
glog.V(2).Infof("Detaching %d endpoint(s) for %s into NEG %s in %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.DetachNetworkEndpoints, "Detach")
}

Expand Down Expand Up @@ -477,6 +477,10 @@ func (s *syncer) resetRetryDelay() {
s.lastRetryDelay = time.Duration(0)
}

func (s *syncer) formattedName() string {
return fmt.Sprintf("%s/%s-%v/%s", s.namespace, s.name, s.port, s.targetPort)
}

// encodeEndpoint encodes ip and instance into a single string
func encodeEndpoint(ip, instance, port string) string {
return fmt.Sprintf("%s||%s||%s", ip, instance, port)
Expand Down