Skip to content

Commit

Permalink
Fix: changing sync annotation=false should delete
Browse files Browse the repository at this point in the history
Previously, if consul.hashicorp.com/service-sync was set to true and
then set to false, the service wouldn't get deleted.

Fixes hashicorp#76
  • Loading branch information
lkysow committed Oct 11, 2019
1 parent 4208b33 commit c9b175b
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 17 deletions.
52 changes: 35 additions & 17 deletions catalog/from-k8s/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,21 @@ type ServiceResource struct {
// ip address will be used instead.
NodePortSync NodePortSyncType

// serviceMap is a mapping of unique key (given by controller) to
// the service structure. endpointsMap is the mapping of the same
// uniqueKey to a set of endpoints.
//
// serviceLock must be held for any read/write to these maps.
serviceLock sync.RWMutex
serviceMap map[string]*apiv1.Service
serviceLock sync.RWMutex

// serviceMap holds services we should sync to Consul. Keys are the
// in the form <kube namespace>/<kube svc name>.
serviceMap map[string]*apiv1.Service

// endpointsMap uses the same keys as serviceMap but maps to the endpoints
// of each service.
endpointsMap map[string]*apiv1.Endpoints
consulMap map[string][]*consulapi.CatalogRegistration

// consulMap holds the services in Consul that we've registered from kube.
// It's populated via Consul's API and lets us diff what is actually in
// Consul vs. what we expect to be there.
consulMap map[string][]*consulapi.CatalogRegistration
}

// Informer implements the controller.Resource interface.
Expand Down Expand Up @@ -109,18 +115,25 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
return nil
}

if !t.shouldSync(service) {
t.Log.Debug("syncing disabled for service, ignoring", "key", key)
return nil
}

t.serviceLock.Lock()
defer t.serviceLock.Unlock()

// Syncing is enabled, let's keep track of this service.
if t.serviceMap == nil {
t.serviceMap = make(map[string]*apiv1.Service)
}

if !t.shouldSync(service) {
// Check if its in our map and delete it.
if _, ok := t.serviceMap[key]; ok {
t.Log.Info("service should no longer be synced", "service", key)
t.doDelete(key)
} else {
t.Log.Debug("syncing disabled for service, ignoring", "key", key)
}
return nil
}

// Syncing is enabled, let's keep track of this service.
t.serviceMap[key] = service

// If we care about endpoints, we should do the initial endpoints load.
Expand Down Expand Up @@ -151,18 +164,23 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
func (t *ServiceResource) Delete(key string) error {
t.serviceLock.Lock()
defer t.serviceLock.Unlock()
t.doDelete(key)
t.Log.Info("delete", "key", key)
return nil
}

// doDelete is a helper function for deletion.
//
// Precondition: assumes t.serviceLock is held
func (t *ServiceResource) doDelete(key string) {
delete(t.serviceMap, key)
delete(t.endpointsMap, key)

// If there were registrations related to this service, then
// delete them and sync.
if _, ok := t.consulMap[key]; ok {
delete(t.consulMap, key)
t.sync()
}

t.Log.Info("delete", "key", key)
return nil
}

// Run implements the controller.Backgrounder interface.
Expand Down
44 changes: 44 additions & 0 deletions catalog/from-k8s/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/hashicorp/consul-k8s/helper/controller"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -196,6 +197,49 @@ func TestServiceResource_system(t *testing.T) {
require.Len(actual, 0)
}

// Test changing the sync tag to false deletes the service.
func TestServiceResource_changeSyncToFalse(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := &TestSyncer{}

// Start the controller
closer := controller.TestControllerRun(&ServiceResource{
Log: hclog.Default(),
Client: client,
Syncer: syncer,
ExplicitEnable: true,
})
defer closer()

// Insert an LB service with the sync=true
svc := testService("foo")
svc.Annotations[annotationServiceSync] = "true"
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(svc)
require.NoError(t, err)

// Verify the service gets registered.
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 1)
})

// Update the sync annotation to false.
svc.Annotations[annotationServiceSync] = "false"
_, err = client.CoreV1().Services(metav1.NamespaceDefault).Update(svc)
require.NoError(t, err)

// Verify the service gets deregistered.
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 0)
})
}

// Test that external IPs take priority.
func TestServiceResource_externalIP(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit c9b175b

Please sign in to comment.