Skip to content

Commit

Permalink
Merge pull request #117 from googleprivate/bug/portallocator
Browse files Browse the repository at this point in the history
Make Port Allocator idempotent for GameServers and Node events
  • Loading branch information
markmandel authored Feb 28, 2018
2 parents 36035bf + 03bd16d commit 8051c10
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 102 deletions.
8 changes: 3 additions & 5 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,11 @@ func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameSe
return gs, nil
}

gsCopy := gs.DeepCopy()

port, err := c.portAllocator.Allocate()
gsCopy, err := c.portAllocator.Allocate(gs.DeepCopy())
if err != nil {
return gsCopy, errors.Wrapf(err, "error allocating port for GameServer %s", gsCopy.Name)
}
gsCopy.Spec.HostPort = port

gsCopy.Status.State = stablev1alpha1.Creating
c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated")

Expand All @@ -328,7 +326,7 @@ func (c *Controller) syncGameServerPortAllocationState(gs *stablev1alpha1.GameSe
if err != nil {
// if the GameServer doesn't get updated with the port data, then put the port
// back in the pool, as it will get retried on the next pass
c.portAllocator.DeAllocate(port)
c.portAllocator.DeAllocate(gsCopy)
return gs, errors.Wrapf(err, "error updating GameServer %s to default values", gs.Name)
}

Expand Down
122 changes: 81 additions & 41 deletions pkg/gameservers/portallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
Expand All @@ -45,6 +46,8 @@ type PortAllocator struct {
logger *logrus.Entry
mutex sync.RWMutex
portAllocations []portAllocation
gameServerRegistry map[types.UID]bool
nodeRegistry map[types.UID]bool
minPort int32
maxPort int32
gameServerSynced cache.InformerSynced
Expand All @@ -70,6 +73,8 @@ func NewPortAllocator(minPort, maxPort int32,
mutex: sync.RWMutex{},
minPort: minPort,
maxPort: maxPort,
gameServerRegistry: map[types.UID]bool{},
nodeRegistry: map[types.UID]bool{},
gameServerSynced: gameServers.Informer().HasSynced,
gameServerLister: gameServers.Lister(),
gameServerInformer: gameServers.Informer(),
Expand All @@ -79,14 +84,6 @@ func NewPortAllocator(minPort, maxPort int32,
}
pa.logger = runtime.NewLoggerWithType(pa)

pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting")
return pa
}

// Run sets up the current state of port allocations and
// starts tracking Pod and Node changes
func (pa *PortAllocator) Run(stop <-chan struct{}) error {
pa.logger.Info("Running")
pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: pa.syncDeleteGameServer,
})
Expand All @@ -98,76 +95,110 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable {
err := pa.syncPortAllocations(stop)
err := pa.syncPortAllocations()
if err != nil {
err := errors.Wrap(err, "error resetting ports on node update")
runtime.HandleError(pa.logger.WithField("node", newNode), err)
}
}
},
DeleteFunc: func(obj interface{}) {
err := pa.syncPortAllocations(stop)
err := pa.syncPortAllocations()
if err != nil {
err := errors.Wrap(err, "error on node deletion")
runtime.HandleError(pa.logger.WithField("node", obj), err)
}
},
})

pa.logger.Info("Flush cache sync, before syncing gameserver and node state")
pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting")
return pa
}

// Run sets up the current state of port allocations and
// starts tracking Pod and Node changes (non blocking)
func (pa *PortAllocator) Run(stop <-chan struct{}) error {
pa.logger.Info("Running")

if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) {
return nil
return errors.New("failed to wait for caches to sync")
}

return pa.syncPortAllocations(stop)
return pa.syncPortAllocations()
}

// Allocate allocates a port. Return ErrPortNotFound if no port is
// allocatable
func (pa *PortAllocator) Allocate() (int32, error) {
// Allocate assigns a port to the GameServer and returns it.
// Return ErrPortNotFound if no port is allocatable
func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) (*v1alpha1.GameServer, error) {
if gs.Spec.PortPolicy != v1alpha1.Dynamic {
return gs, errors.Errorf("Port policy of %s is not supported for port allocation", gs.Spec.PortPolicy)
}

pa.mutex.Lock()
defer pa.mutex.Unlock()
for _, n := range pa.portAllocations {
for p, taken := range n {
if !taken {
n[p] = true
return p, nil
pa.gameServerRegistry[gs.ObjectMeta.UID] = true
gs.Spec.HostPort = p
return gs, nil
}
}
}
return -1, ErrPortNotFound
return gs, ErrPortNotFound
}

// DeAllocate marks the given port as no longer allocated
func (pa *PortAllocator) DeAllocate(port int32) {
func (pa *PortAllocator) DeAllocate(gs *v1alpha1.GameServer) {
if gs.Spec.PortPolicy != v1alpha1.Dynamic {
return
}
if gs.Spec.HostPort < pa.minPort || gs.Spec.HostPort > pa.maxPort {
return
}
// skip if it wasn't previously allocated
if _, ok := pa.gameServerRegistry[gs.ObjectMeta.UID]; !ok {
pa.logger.WithField("gs", gs.ObjectMeta.Name).
Info("Did not allocate this GameServer. Ignoring for DeAllocation")
return
}
pa.mutex.Lock()
defer pa.mutex.Unlock()
pa.portAllocations = setPortAllocation(port, pa.portAllocations, false)
pa.portAllocations = setPortAllocation(gs.Spec.HostPort, pa.portAllocations, false)
delete(pa.gameServerRegistry, gs.ObjectMeta.UID)
}

// syncAddNode adds another node port section
// to the available ports
func (pa *PortAllocator) syncAddNode(obj interface{}) {
pa.mutex.Lock()
defer pa.mutex.Unlock()

node := obj.(*corev1.Node)
// if we're already added this node, don't do it again
if _, ok := pa.nodeRegistry[node.ObjectMeta.UID]; ok {
pa.logger.WithField("node", node.ObjectMeta.Name).Info("Already added node to port allocations. Skipping")
return
}

pa.logger.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations")
pa.mutex.Lock()
defer pa.mutex.Unlock()

ports := portAllocation{}
for i := pa.minPort; i <= pa.maxPort; i++ {
ports[i] = false
}

pa.portAllocations = append(pa.portAllocations, ports)
pa.nodeRegistry[node.ObjectMeta.UID] = true
}

// syncDeleteGameServer when a GameServer Pod is deleted
// make the HostPort available
func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
gs := object.(*v1alpha1.GameServer)
pa.logger.WithField("gs", gs).Info("syncing deleted GameServer")
pa.DeAllocate(gs.Spec.HostPort)
if gs, ok := object.(*v1alpha1.GameServer); ok {
pa.logger.WithField("gs", gs).Info("syncing deleted GameServer")
pa.DeAllocate(gs)
}
}

// syncPortAllocations syncs the pod, node and gameserver caches then
Expand All @@ -176,39 +207,41 @@ func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
// portAllocations are marked as taken.
// Locks the mutex while doing this.
// This is basically a stop the world Garbage Collection on port allocations.
func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error {
func (pa *PortAllocator) syncPortAllocations() error {
pa.mutex.Lock()
defer pa.mutex.Unlock()

pa.logger.Info("Resetting Port Allocation")

if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) {
return nil
}

nodes, err := pa.nodeLister.List(labels.Everything())
if err != nil {
return errors.Wrap(err, "error listing all nodes")
}

// setup blank port values
nodePorts := pa.nodePortAllocation(nodes)
nodePorts, nodeRegistry := pa.nodePortAllocation(nodes)

gameservers, err := pa.gameServerLister.List(labels.Everything())
if err != nil {
return errors.Wrapf(err, "error listing all GameServers")
}

gsRegistry := map[types.UID]bool{}

// place to put GameServer port allocations that are not ready yet/after the ready state
var nonReadyNodesPorts []int32
// Check GameServers as well, as some
for _, gs := range gameservers {
// if the node doesn't exist, it's likely unscheduled
_, ok := nodePorts[gs.Status.NodeName]
if gs.Status.NodeName != "" && ok {
nodePorts[gs.Status.NodeName][gs.Status.Port] = true
} else if gs.Spec.HostPort != 0 {
nonReadyNodesPorts = append(nonReadyNodesPorts, gs.Spec.HostPort)
if gs.Spec.PortPolicy == v1alpha1.Dynamic {
gsRegistry[gs.ObjectMeta.UID] = true

// if the node doesn't exist, it's likely unscheduled
_, ok := nodePorts[gs.Status.NodeName]
if gs.Status.NodeName != "" && ok {
nodePorts[gs.Status.NodeName][gs.Status.Port] = true
} else if gs.Spec.HostPort != 0 {
nonReadyNodesPorts = append(nonReadyNodesPorts, gs.Spec.HostPort)
}
}
}

Expand All @@ -229,15 +262,21 @@ func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error {
}

pa.portAllocations = allocations
pa.gameServerRegistry = gsRegistry
pa.nodeRegistry = nodeRegistry

return nil
}

// nodePortAllocation returns a map of port allocations all set to being available
// with a map key for each node
func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation {
// with a map key for each node, as well as the node registry record (since we're already looping)
func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) (map[string]portAllocation, map[types.UID]bool) {
nodePorts := map[string]portAllocation{}
nodeRegistry := map[types.UID]bool{}

for _, n := range nodes {
nodeRegistry[n.ObjectMeta.UID] = true

// ignore unschedulable nodes
if !n.Spec.Unschedulable {
nodePorts[n.Name] = portAllocation{}
Expand All @@ -246,7 +285,8 @@ func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]por
}
}
}
return nodePorts

return nodePorts, nodeRegistry
}

// setPortAllocation takes a port from an all
Expand Down
Loading

0 comments on commit 8051c10

Please sign in to comment.