Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move cluster node addition/removal out of "experimental" #271

Merged
merged 2 commits into from
Jun 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,11 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
}

// Run the Port Allocator
if err := c.portAllocator.Run(stop); err != nil {
return err
}
go func() {
if err := c.portAllocator.Run(stop); err != nil {
c.logger.WithError(err).Error("error running the port allocator")
}
}()

// Run the Health Controller
go c.healthController.Run(stop)
Expand Down
61 changes: 43 additions & 18 deletions pkg/gameservers/portallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ package gameservers
import (
"sync"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/workerqueue"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -31,6 +33,11 @@ import (
"k8s.io/client-go/tools/cache"
)

// syncAllKey is the queue key to sync all the ports.
// the + symbol is deliberate, is it can't be used in a K8s
// naming scheme
const syncAllKey = cache.ExplicitKey("SYNC+ALL")

// ErrPortNotFound is returns when a port is unable to be allocated
var ErrPortNotFound = errors.New("Unable to allocate a port")

Expand All @@ -56,6 +63,7 @@ type PortAllocator struct {
nodeSynced cache.InformerSynced
nodeLister corelisterv1.NodeLister
nodeInformer cache.SharedIndexInformer
workerqueue *workerqueue.WorkerQueue
}

// NewPortAllocator returns a new dynamic port
Expand Down Expand Up @@ -83,31 +91,27 @@ func NewPortAllocator(minPort, maxPort int32,
nodeSynced: nodes.Informer().HasSynced,
}
pa.logger = runtime.NewLoggerWithType(pa)
pa.workerqueue = workerqueue.NewWorkerQueue(pa.syncPorts, pa.logger, stable.GroupName+".PortAllocator")

pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: pa.syncDeleteGameServer,
})

// Experimental support for node adding/removal
pa.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pa.syncAddNode,
AddFunc: func(obj interface{}) {
node := obj.(*corev1.Node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the semantics of the event handler, but is there a chance that obj isn't of *corev1.Node type? In other words, is this a safe type assertion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good question - basically, there isn't really. But a good question!

If something other than a Node comes through a Node event handler, then something very weird has happened -- at which point, it really should panic.

Here another example in the Kubernetes sample-controller doing similar things:
https://github.com/kubernetes/sample-controller/blob/master/controller.go#L129

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then panicking is exactly the right thing to do.

pa.workerqueue.Enqueue(cache.ExplicitKey(node.Name))
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable {
err := pa.syncPortAllocations()
if err != nil {
err = errors.Wrap(err, "error resetting ports on node update")
runtime.HandleError(pa.logger.WithField("node", newNode), err)
}
pa.workerqueue.Enqueue(syncAllKey)
}
},
DeleteFunc: func(obj interface{}) {
err := pa.syncPortAllocations()
if err != nil {
err = errors.Wrap(err, "error on node deletion")
runtime.HandleError(pa.logger.WithField("node", obj), err)
}
DeleteFunc: func(_ interface{}) {
pa.workerqueue.Enqueue(syncAllKey)
},
})

Expand All @@ -116,15 +120,37 @@ func NewPortAllocator(minPort, maxPort int32,
}

// Run sets up the current state of port allocations and
// starts tracking Pod and Node changes (non blocking)
// starts tracking Pod and Node changes
func (pa *PortAllocator) Run(stop <-chan struct{}) error {
pa.logger.Info("Running")

if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) {
return errors.New("failed to wait for caches to sync")
}

return pa.syncPortAllocations()
// on run, let's make sure we start with a perfect slate straight away
if err := pa.syncAll(); err != nil {
return errors.Wrap(err, "error performing initial sync")
}

pa.workerqueue.Run(1, stop)
return nil
}

// syncPorts synchronises ports for the given key
func (pa *PortAllocator) syncPorts(key string) error {
if key == string(syncAllKey) {
return pa.syncAll()
}

// if we get a specific node name, we add some ports
node, err := pa.nodeLister.Get(key)
if err != nil {
return errors.Wrapf(err, "error retrieving node %s", key)
}
pa.syncAddNode(node)

return nil
}

// Allocate assigns a port to the GameServer and returns it.
Expand Down Expand Up @@ -171,8 +197,7 @@ func (pa *PortAllocator) DeAllocate(gs *v1alpha1.GameServer) {

// syncAddNode adds another node port section
// to the available ports
func (pa *PortAllocator) syncAddNode(obj interface{}) {
node := obj.(*corev1.Node)
func (pa *PortAllocator) syncAddNode(node *corev1.Node) {
// if we're already added this node, don't do it again
if _, ok := pa.nodeRegistry[node.ObjectMeta.UID]; ok {
pa.logger.WithField("node", node.ObjectMeta.Name).Info("Already added node to port allocations. Skipping")
Expand Down Expand Up @@ -201,13 +226,13 @@ func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
}
}

// syncPortAllocations syncs the pod, node and gameserver caches then
// syncAll syncs the pod, node and gameserver caches then
// traverses all Nodes in the cluster and all looks at GameServers
// and Terminating Pods values make sure those
// portAllocations are marked as taken.
// Locks the mutex while doing this.
// This is basically a stop the world Garbage Collection on port allocations.
func (pa *PortAllocator) syncPortAllocations() error {
func (pa *PortAllocator) syncAll() error {
pa.mutex.Lock()
defer pa.mutex.Unlock()

Expand Down
75 changes: 58 additions & 17 deletions pkg/gameservers/portallocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package gameservers

import (
"fmt"
"testing"

"sync"
"testing"
"time"

"agones.dev/agones/pkg/apis/stable/v1alpha1"
agtesting "agones.dev/agones/pkg/testing"
Expand Down Expand Up @@ -48,14 +48,15 @@ func TestPortAllocatorAllocate(t *testing.T) {
nodeWatch := watch.NewFake()
m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil))

stop, cancel := agtesting.StartInformers(m)
stop, cancel := agtesting.StartInformers(m, pa.nodeSynced)
defer cancel()

// Make sure the add's don't corrupt the sync
nodeWatch.Add(&n1)
nodeWatch.Add(&n2)
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))

err := pa.Run(stop)
err := pa.syncAll()
assert.Nil(t, err)

// two nodes
Expand Down Expand Up @@ -83,9 +84,9 @@ func TestPortAllocatorAllocate(t *testing.T) {
nl := &corev1.NodeList{Items: []corev1.Node{n1}}
return true, nl, nil
})
stop, cancel := agtesting.StartInformers(m)
_, cancel := agtesting.StartInformers(m, pa.nodeSynced)
defer cancel()
err := pa.Run(stop)
err := pa.syncAll()
assert.Nil(t, err)
var ports []int32
for i := 10; i <= 20; i++ {
Expand All @@ -106,9 +107,9 @@ func TestPortAllocatorMultithreadAllocate(t *testing.T) {
nl := &corev1.NodeList{Items: []corev1.Node{n1, n2}}
return true, nl, nil
})
stop, cancel := agtesting.StartInformers(m)
_, cancel := agtesting.StartInformers(m, pa.nodeSynced)
defer cancel()
err := pa.Run(stop)
err := pa.syncAll()
assert.Nil(t, err)
wg := sync.WaitGroup{}

Expand Down Expand Up @@ -139,9 +140,9 @@ func TestPortAllocatorDeAllocate(t *testing.T) {
nl := &corev1.NodeList{Items: nodes}
return true, nl, nil
})
stop, cancel := agtesting.StartInformers(m)
_, cancel := agtesting.StartInformers(m, pa.nodeSynced)
defer cancel()
err := pa.Run(stop)
err := pa.syncAll()
assert.Nil(t, err)

for i := 0; i <= 100; i++ {
Expand Down Expand Up @@ -199,12 +200,12 @@ func TestPortAllocatorSyncPortAllocations(t *testing.T) {
return true, gsl, nil
})

stop, cancel := agtesting.StartInformers(m)
_, cancel := agtesting.StartInformers(m, pa.gameServerSynced, pa.nodeSynced)
defer cancel()

err := pa.Run(stop)

err := pa.syncAll()
assert.Nil(t, err)

assert.Len(t, pa.portAllocations, 3)
assert.Len(t, pa.gameServerRegistry, 5)

Expand Down Expand Up @@ -247,14 +248,16 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) {
return true, nl, nil
})

stop, cancel := agtesting.StartInformers(m)
stop, cancel := agtesting.StartInformers(m, pa.gameServerSynced, pa.nodeSynced)
defer cancel()

gsWatch.Add(gs1.DeepCopy())
gsWatch.Add(gs2.DeepCopy())
gsWatch.Add(gs3.DeepCopy())

err := pa.Run(stop)
assert.True(t, cache.WaitForCacheSync(stop, pa.gameServerSynced))

err := pa.syncAll()
assert.Nil(t, err)

// gate
Expand Down Expand Up @@ -291,15 +294,40 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil))
m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(gsWatch, nil))

received := make(chan string, 10)
defer close(received)

f := pa.workerqueue.SyncHandler
pa.workerqueue.SyncHandler = func(s string) error {
err := f(s)
assert.Nil(t, err, "sync handler failed")
received <- s
return nil
}

stop, cancel := agtesting.StartInformers(m)
defer cancel()

// Make sure the add's don't corrupt the sync
nodeWatch.Add(&n1)
nodeWatch.Add(&n2)

err := pa.Run(stop)
assert.Nil(t, err)
go func() {
err := pa.Run(stop)
assert.Nil(t, err)
}()

testReceived := func(expected, failMsg string) {
select {
case key := <-received:
assert.Equal(t, expected, key)
case <-time.After(3 * time.Second):
assert.FailNow(t, failMsg, "expected: %s", expected)
}
}

testReceived(n1.ObjectMeta.Name, "add node 1")
testReceived(n2.ObjectMeta.Name, "add node 2")

// add a game server
gs, err := pa.Allocate(fixture.DeepCopy())
Expand All @@ -317,6 +345,7 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
logrus.Info("adding n3")
nodeWatch.Add(&n3)
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))
testReceived(n3.ObjectMeta.Name, "add node 3")

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 3)
Expand All @@ -330,6 +359,8 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
assert.True(t, copy.Spec.Unschedulable)
nodeWatch.Modify(copy)
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))
testReceived(string(syncAllKey), "unscheduled node 3")

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 2)
assert.Equal(t, 1, countAllocatedPorts(pa, port))
Expand All @@ -341,6 +372,8 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
copy.Spec.Unschedulable = false
nodeWatch.Modify(copy)
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))
testReceived(string(syncAllKey), "scheduled node 3")

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 3)
assert.Equal(t, 1, countAllocatedPorts(pa, port))
Expand All @@ -350,13 +383,21 @@ func TestPortAllocatorNodeEvents(t *testing.T) {
logrus.Info("deleting n3")
nodeWatch.Delete(n3.DeepCopy())
assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced))
testReceived(string(syncAllKey), "deleting node 3")

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 2)
assert.Equal(t, 1, countAllocatedPorts(pa, port))
pa.mutex.RUnlock()

// add the n1 node again, it shouldn't do anything
nodeWatch.Add(&n1)
select {
case <-received:
assert.FailNow(t, "adding back n1: event should not happen")
case <-time.After(time.Second):
}

pa.mutex.RLock()
assert.Len(t, pa.portAllocations, 2)
assert.Equal(t, 1, countAllocatedPorts(pa, port))
Expand Down