Skip to content

Commit

Permalink
Move cluster node addition/removal out of "experimental"
Browse files Browse the repository at this point in the history
Implemented workerqueue for node modifications in the
PortAllocator, so that if the master happens to go
down, then the operations the PortAllocator has to do
to keep things in check will retry and result in a
correct state.

Closes #60
  • Loading branch information
markmandel committed Jun 19, 2018
1 parent f4877f6 commit ad83f37
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 38 deletions.
8 changes: 5 additions & 3 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,11 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
}

// Run the Port Allocator
if err := c.portAllocator.Run(stop); err != nil {
return err
}
go func() {
if err := c.portAllocator.Run(stop); err != nil {
c.logger.WithError(err).Error("error running the port allocator")
}
}()

// Run the Health Controller
go c.healthController.Run(stop)
Expand Down
61 changes: 43 additions & 18 deletions pkg/gameservers/portallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package gameservers
import (
"sync"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/workerqueue"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -31,6 +33,11 @@ import (
"k8s.io/client-go/tools/cache"
)

// syncAllKey is the queue key to sync all the ports.
// the + symbol is deliberate, is it can't be used in a K8s
// naming scheme
const syncAllKey = cache.ExplicitKey("SYNC+ALL")

// ErrPortNotFound is returns when a port is unable to be allocated
var ErrPortNotFound = errors.New("Unable to allocate a port")

Expand All @@ -56,6 +63,7 @@ type PortAllocator struct {
nodeSynced cache.InformerSynced
nodeLister corelisterv1.NodeLister
nodeInformer cache.SharedIndexInformer
workerqueue *workerqueue.WorkerQueue
}

// NewPortAllocator returns a new dynamic port
Expand Down Expand Up @@ -83,31 +91,27 @@ func NewPortAllocator(minPort, maxPort int32,
nodeSynced: nodes.Informer().HasSynced,
}
pa.logger = runtime.NewLoggerWithType(pa)
pa.workerqueue = workerqueue.NewWorkerQueue(pa.syncPorts, pa.logger, stable.GroupName+".PortAllocator")

pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: pa.syncDeleteGameServer,
})

// Experimental support for node adding/removal
pa.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pa.syncAddNode,
AddFunc: func(obj interface{}) {
node := obj.(*corev1.Node)
pa.workerqueue.Enqueue(cache.ExplicitKey(node.Name))
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable {
err := pa.syncPortAllocations()
if err != nil {
err = errors.Wrap(err, "error resetting ports on node update")
runtime.HandleError(pa.logger.WithField("node", newNode), err)
}
pa.workerqueue.Enqueue(syncAllKey)
}
},
DeleteFunc: func(obj interface{}) {
err := pa.syncPortAllocations()
if err != nil {
err = errors.Wrap(err, "error on node deletion")
runtime.HandleError(pa.logger.WithField("node", obj), err)
}
DeleteFunc: func(_ interface{}) {
pa.workerqueue.Enqueue(syncAllKey)
},
})

Expand All @@ -116,15 +120,37 @@ func NewPortAllocator(minPort, maxPort int32,
}

// Run sets up the current state of port allocations and
// starts tracking Pod and Node changes (non blocking)
// starts tracking Pod and Node changes
func (pa *PortAllocator) Run(stop <-chan struct{}) error {
pa.logger.Info("Running")

if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) {
return errors.New("failed to wait for caches to sync")
}

return pa.syncPortAllocations()
// on run, let's make sure we start with a perfect slate straight away
if err := pa.syncAll(); err != nil {
return errors.Wrap(err, "error performing initial sync")
}

pa.workerqueue.Run(1, stop)
return nil
}

// syncPorts synchronises ports for the given key
func (pa *PortAllocator) syncPorts(key string) error {
if key == string(syncAllKey) {
return pa.syncAll()
}

// if we get a specific node name, we add some ports
node, err := pa.nodeLister.Get(key)
if err != nil {
return errors.Wrapf(err, "error retrieving node %s", key)
}
pa.syncAddNode(node)

return nil
}

// Allocate assigns a port to the GameServer and returns it.
Expand Down Expand Up @@ -171,8 +197,7 @@ func (pa *PortAllocator) DeAllocate(gs *v1alpha1.GameServer) {

// syncAddNode adds another node port section
// to the available ports
func (pa *PortAllocator) syncAddNode(obj interface{}) {
node := obj.(*corev1.Node)
func (pa *PortAllocator) syncAddNode(node *corev1.Node) {
// if we're already added this node, don't do it again
if _, ok := pa.nodeRegistry[node.ObjectMeta.UID]; ok {
pa.logger.WithField("node", node.ObjectMeta.Name).Info("Already added node to port allocations. Skipping")
Expand Down Expand Up @@ -201,13 +226,13 @@ func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
}
}

// syncPortAllocations syncs the pod, node and gameserver caches then
// syncAll syncs the pod, node and gameserver caches then
// traverses all Nodes in the cluster and all looks at GameServers
// and Terminating Pods values make sure those
// portAllocations are marked as taken.
// Locks the mutex while doing this.
// This is basically a stop the world Garbage Collection on port allocations.
func (pa *PortAllocator) syncPortAllocations() error {
func (pa *PortAllocator) syncAll() error {
pa.mutex.Lock()
defer pa.mutex.Unlock()

Expand Down
75 changes: 58 additions & 17 deletions pkg/gameservers/portallocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package gameservers

import (
"fmt"
"testing"

"sync"
"testing"
"time"

"agones.dev/agones/pkg/apis/stable/v1alpha1"
agtesting "agones.dev/agones/pkg/testing"
Expand Down Expand Up @@ -48,14 +48,15 @@ func TestPortAllocatorAllocate(t *testing.T) {
nodeWatch := watch.NewFake()
m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil))

stop, cancel := agtesting.StartInformers(m)
stop, cancel := agtesting.StartInformers(m, pa.nodeSynced)
defer cancel()

// Make sure the add's don't corrupt the sync
nodeWatch.Add(&n1)
nodeWatch.Add(&n2)
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))

err := pa.Run(stop)
err := pa.syncAll()
assert.Nil(t, err)

// two nodes
Expand Down Expand Up @@ -83,9 +84,9 @@ func TestPortAllocatorAllocate(t *testing.T) {
nl := &corev1.NodeList{Items: []corev1.Node{n1}}
return true, nl, nil
})
stop, cancel := agtesting.StartInformers(m)
_, cancel := agtesting.StartInformers(m, pa.nodeSynced)
defer cancel()
err := pa.Run(stop)
err := pa.syncAll()
assert.Nil(t, err)
var ports []int32
for i := 10; i <= 20; i++ {
Expand All @@ -106,9 +107,9 @@ func TestPortAllocatorMultithreadAllocate(t *testing.T) {
nl := &corev1.NodeList{Items: []corev1.Node{n1, n2}}
return true, nl, nil
})
stop, cancel := agtesting.StartInformers(m)
_, cancel := agtesting.StartInformers(m, pa.nodeSynced)
defer cancel()
err := pa.Run(stop)
err := pa.syncAll()
assert.Nil(t, err)
wg := sync.WaitGroup{}

Expand Down Expand Up @@ -139,9 +140,9 @@ func TestPortAllocatorDeAllocate(t *testing.T) {
nl := &corev1.NodeList{Items: nodes}
return true, nl, nil
})
stop, cancel := agtesting.StartInformers(m)
_, cancel := agtesting.StartInformers(m, pa.nodeSynced)
defer cancel()
err := pa.Run(stop)
err := pa.syncAll()
assert.Nil(t, err)

for i := 0; i <= 100; i++ {
Expand Down Expand Up @@ -199,12 +200,12 @@ func TestPortAllocatorSyncPortAllocations(t *testing.T) {
return true, gsl, nil
})

stop, cancel := agtesting.StartInformers(m)
_, cancel := agtesting.StartInformers(m, pa.gameServerSynced, pa.nodeSynced)
defer cancel()

err := pa.Run(stop)

err := pa.syncAll()
assert.Nil(t, err)

assert.Len(t, pa.portAllocations, 3)
assert.Len(t, pa.gameServerRegistry, 5)

Expand Down Expand Up @@ -247,14 +248,16 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) {
return true, nl, nil
})

stop, cancel := agtesting.StartInformers(m)
stop, cancel := agtesting.StartInformers(m, pa.gameServerSynced, pa.nodeSynced)
defer cancel()

gsWatch.Add(gs1.DeepCopy())
gsWatch.Add(gs2.DeepCopy())
gsWatch.Add(gs3.DeepCopy())

err := pa.Run(stop)
assert.True(t, cache.WaitForCacheSync(stop, pa.gameServerSynced))

err := pa.syncAll()
assert.Nil(t, err)

// gate
Expand Down Expand Up @@ -291,15 +294,40 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil))
m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(gsWatch, nil))

received := make(chan string, 10)
defer close(received)

f := pa.workerqueue.SyncHandler
pa.workerqueue.SyncHandler = func(s string) error {
err := f(s)
assert.Nil(t, err, "sync handler failed")
received <- s
return nil
}

stop, cancel := agtesting.StartInformers(m)
defer cancel()

// Make sure the add's don't corrupt the sync
nodeWatch.Add(&n1)
nodeWatch.Add(&n2)

err := pa.Run(stop)
assert.Nil(t, err)
go func() {
err := pa.Run(stop)
assert.Nil(t, err)
}()

testReceived := func(expected, failMsg string) {
select {
case key := <-received:
assert.Equal(t, expected, key)
case <-time.After(3 * time.Second):
assert.FailNow(t, failMsg, "expected: %s", expected)
}
}

testReceived(n1.ObjectMeta.Name, "add node 1")
testReceived(n2.ObjectMeta.Name, "add node 2")

// add a game server
gs, err := pa.Allocate(fixture.DeepCopy())
Expand All @@ -317,6 +345,7 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
logrus.Info("adding n3")
nodeWatch.Add(&n3)
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))
testReceived(n3.ObjectMeta.Name, "add node 3")

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 3)
Expand All @@ -330,6 +359,8 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
assert.True(t, copy.Spec.Unschedulable)
nodeWatch.Modify(copy)
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))
testReceived(string(syncAllKey), "unscheduled node 3")

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 2)
assert.Equal(t, 1, countAllocatedPorts(pa, port))
Expand All @@ -341,6 +372,8 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
copy.Spec.Unschedulable = false
nodeWatch.Modify(copy)
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))
testReceived(string(syncAllKey), "scheduled node 3")

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 3)
assert.Equal(t, 1, countAllocatedPorts(pa, port))
Expand All @@ -350,13 +383,21 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
logrus.Info("deleting n3")
nodeWatch.Delete(n3.DeepCopy())
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))
testReceived(string(syncAllKey), "deleting node 3")

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 2)
assert.Equal(t, 1, countAllocatedPorts(pa, port))
pa.mutex.RUnlock()

// add the n1 node again, it shouldn't do anything
nodeWatch.Add(&n1)
select {
case <-received:
assert.FailNow(t, "adding back n1: event should not happen")
case <-time.After(time.Second):
}

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 2)
assert.Equal(t, 1, countAllocatedPorts(pa, port))
Expand Down

0 comments on commit ad83f37

Please sign in to comment.