Skip to content

Commit

Permalink
Implements UpdateList, AddValue, and RemoveValue in the SDK Server
Browse files Browse the repository at this point in the history
  • Loading branch information
igooch committed Oct 24, 2023
1 parent 4cebbf9 commit dda41d3
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 16 deletions.
6 changes: 3 additions & 3 deletions pkg/apis/agones/v1/gameserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ func (gs *GameServer) AppendListValues(name string, values []string) error {
return errors.Errorf("unable to AppendListValues: Name %s, Values %s. No values to append", name, values)
}
if list, ok := gs.Status.Lists[name]; ok {
mergedList := mergeRemoveDuplicates(list.Values, values)
mergedList := MergeRemoveDuplicates(list.Values, values)
// TODO: Truncate and apply up to cutoff
if len(mergedList) > int(list.Capacity) {
return errors.Errorf("unable to AppendListValues: Name %s, Values %s. Appended list length %d exceeds list capacity %d", name, values, len(mergedList), list.Capacity)
Expand All @@ -953,10 +953,10 @@ func (gs *GameServer) AppendListValues(name string, values []string) error {
return errors.Errorf("unable to AppendListValues: Name %s, Values %s. List not found in GameServer %s", name, values, gs.ObjectMeta.GetName())
}

// mergeRemoveDuplicates merges two lists and removes any duplicate values.
// MergeRemoveDuplicates merges two lists and removes any duplicate values.
// Maintains ordering, so new values from list2 are appended to the end of list1.
// Returns a new list with unique values only.
func mergeRemoveDuplicates(list1 []string, list2 []string) []string {
func MergeRemoveDuplicates(list1 []string, list2 []string) []string {
uniqueList := []string{}
listMap := make(map[string]bool)
for _, v1 := range list1 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/agones/v1/gameserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,7 @@ func TestMergeRemoveDuplicates(t *testing.T) {

for test, testCase := range testCases {
t.Run(test, func(t *testing.T) {
got := mergeRemoveDuplicates(testCase.str1, testCase.str2)
got := MergeRemoveDuplicates(testCase.str1, testCase.str2)
assert.Equal(t, testCase.want, got)
})
}
Expand Down
235 changes: 223 additions & 12 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
updatePlayerCapacity Operation = "updatePlayerCapacity"
updateConnectedPlayers Operation = "updateConnectedPlayers"
updateCounters Operation = "updateCounters"
updateLists Operation = "updateLists"
updatePeriod time.Duration = time.Second
)

Expand All @@ -81,6 +82,15 @@ type counterUpdateRequest struct {
counter agonesv1.CounterStatus
}

type listUpdateRequest struct {
// Capacity of the List as set by capacitySet.
capacitySet *int64
// Values to add to the List
listAppend []string
// Values to remove from the List
listDelete []string
}

// SDKServer is a gRPC server, that is meant to be a sidecar
// for a GameServer that will update the game server status on SDK requests
//
Expand Down Expand Up @@ -118,6 +128,7 @@ type SDKServer struct {
gsPlayerCapacity int64
gsConnectedPlayers []string
gsCounterUpdates map[string]counterUpdateRequest
gsListUpdates map[string]listUpdateRequest
gsCopy *agonesv1.GameServer
}

Expand Down Expand Up @@ -160,6 +171,7 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) {
// Once FeatureCountsAndLists is in GA, move this into SDKServer creation above.
s.gsCounterUpdates = map[string]counterUpdateRequest{}
s.gsListUpdates = map[string]listUpdateRequest{}
}

s.informerFactory = factory
Expand Down Expand Up @@ -332,6 +344,8 @@ func (s *SDKServer) syncGameServer(ctx context.Context, key string) error {
return s.updateConnectedPlayers(ctx)
case updateCounters:
return s.updateCounter(ctx)
case updateLists:
return s.updateList(ctx)
}

return errors.Errorf("could not sync game server key: %s", key)
Expand Down Expand Up @@ -978,48 +992,245 @@ func (s *SDKServer) updateCounter(ctx context.Context) error {
return nil
}

// GetList returns a List. Returns NOT_FOUND if the List does not exist.
// GetList returns a List. Returns not found if the List does not exist.
// [Stage:Alpha]
// [FeatureFlag:CountsAndLists]
func (s *SDKServer) GetList(ctx context.Context, in *alpha.GetListRequest) (*alpha.List, error) {
if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) {
return nil, errors.Errorf("%s not enabled", runtime.FeatureCountsAndLists)
}
// TODO(#2716): Implement me
return nil, errors.Errorf("Unimplemented -- GetList coming soon")
s.logger.WithField("name", in.Name).Debug("Getting List")

gs, err := s.gameServer()
if err != nil {
return nil, err
}

s.gsUpdateMutex.RLock()
defer s.gsUpdateMutex.RUnlock()

list, ok := gs.Status.Lists[in.Name]
if !ok {
return nil, errors.Errorf("list not found: %s", in.Name)
}

s.logger.WithField("Get List", list).Debugf("Got List %s", in.Name)
protoList := alpha.List{Name: in.Name, Values: list.Values, Capacity: list.Capacity}
// If there are batched changes that have not yet been applied, apply them to the List.
// This does NOT validate batched the changes.
if listUpdate, ok := s.gsListUpdates[in.Name]; ok {
if listUpdate.capacitySet != nil {
protoList.Capacity = *listUpdate.capacitySet
}
// TODO: DELETE
// if len(listUpdate.listDelete) != 0 {
// }
// TODO: APPEND
// if len(listUpdate.listAppend) != 0 {
// }
// TODO: Truncate if past capacity
s.logger.WithField("Get List", list).Debugf("Applied Batched List Updates %v", listUpdate)
}

return &protoList, nil
}

// UpdateList returns the updated List.
// UpdateList collapses all update capacity requests for a given List into a single UpdateList request.
// This function currently only updates the Capacity of a List.
// Returns error if the List does not exist (name cannot be updated).
// Returns error if the List update capacity is out of range [0,1000].
// [Stage:Alpha]
// [FeatureFlag:CountsAndLists]
func (s *SDKServer) UpdateList(ctx context.Context, in *alpha.UpdateListRequest) (*alpha.List, error) {
if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) {
return nil, errors.Errorf("%s not enabled", runtime.FeatureCountsAndLists)
}
// TODO(#2716): Implement Me
return nil, errors.Errorf("Unimplemented -- UpdateList coming soon")

if in.List == nil || in.UpdateMask == nil {
return nil, errors.Errorf("invalid argument. List: %v and UpdateMask %v cannot be nil", in.List, in.UpdateMask)
}

name := in.List.Name
s.logger.WithField("name", name).Debug("Update List -- Currently only used for Updating Capacity")

s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

gs, err := s.gameServer()
if err != nil {
return nil, err
}

// TODO: Pull in variable Max Capacity from CRD instead of hard-coded number here.
if in.List.Capacity < 0 || in.List.Capacity > 1000 {
return nil, errors.Errorf("out of range. Capacity must be within range [0,1000]. Found Capacity: %d", in.List.Capacity)
}

if _, ok := gs.Status.Lists[name]; ok {
batchList := s.gsListUpdates[name]
batchList.capacitySet = &in.List.Capacity
s.gsListUpdates[name] = batchList
// Queue up the Update for later batch processing by updateLists.
s.workerqueue.Enqueue(cache.ExplicitKey(updateLists))
return &alpha.List{}, nil
}
return nil, errors.Errorf("not found. %s List not found", name)
}

// AddListValue returns the updated List.
// AddListValue collapses all append a value to the end of a List requests into a single UpdateList request.
// Returns not found if the List does not exist.
// Returns already exists if the value is already in the List.
// Returns out of range if the List is already at Capacity.
// [Stage:Alpha]
// [FeatureFlag:CountsAndLists]
func (s *SDKServer) AddListValue(ctx context.Context, in *alpha.AddListValueRequest) (*alpha.List, error) {
if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) {
return nil, errors.Errorf("%s not enabled", runtime.FeatureCountsAndLists)
}
// TODO(#2716): Implement Me
return nil, errors.Errorf("Unimplemented -- AddListValue coming soon")
s.logger.WithField("name", in.Name).Debug("Add List Value")

s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

gs, err := s.gameServer()
if err != nil {
return nil, err
}

if list, ok := gs.Status.Lists[in.Name]; ok {
batchList := s.gsListUpdates[in.Name]
// Verify room to add another value
var capacity int64
if batchList.capacitySet != nil {
capacity = *batchList.capacitySet
} else {
capacity = int64(len(list.Values) + len(batchList.listAppend) - len(batchList.listDelete))
}
if list.Capacity <= capacity {
return nil, errors.Errorf("out of range. No available capacity. Current Capacity: %d, List Size: %d", list.Capacity, len(list.Values))
}
// Verify value does not already exist in the list
// TODO: This does not check batched but not yet applied append / remove values. Should we do this?
// (Easy to check not yet applied values, hard to check removed and re-added values.) I'm
// thinking this would be better / easier to do as part of the batch apply update to list, and
// not verify here.
for _, val := range list.Values {
if in.Value == val {
return nil, errors.Errorf("already exists. Value: %s already in List: %s", in.Value, in.Name)
}
}
batchList.listAppend = append(batchList.listAppend, in.Value)
s.gsListUpdates[in.Name] = batchList
// Queue up the Update for later batch processing by updateLists.
s.workerqueue.Enqueue(cache.ExplicitKey(updateLists))
return &alpha.List{}, nil
}
return nil, errors.Errorf("not found. %s List not found", in.Name)
}

// RemoveListValue returns the updated List.
// RemoveListValue collapses all remove a value from a List requests into a single UpdateList request.
// Returns not found if the List does not exist.
// Returns not found if the value is not in the List.
// [Stage:Alpha]
// [FeatureFlag:CountsAndLists]
func (s *SDKServer) RemoveListValue(ctx context.Context, in *alpha.RemoveListValueRequest) (*alpha.List, error) {
if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) {
return nil, errors.Errorf("%s not enabled", runtime.FeatureCountsAndLists)
}
// TODO(#2716): Implement Me
return nil, errors.Errorf("Unimplemented -- RemoveListValue coming soon")

s.logger.WithField("name", in.Name).Debug("Remove List Value")

s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

gs, err := s.gameServer()
if err != nil {
return nil, err
}

if list, ok := gs.Status.Lists[in.Name]; ok {
// Verify value exists in the list
for _, val := range list.Values {
if in.Value != val {
continue
}
// Add value to remove to gsListUpdates map.
batchList := s.gsListUpdates[in.Name]
batchList.listDelete = append(batchList.listDelete, in.Value)
s.gsListUpdates[in.Name] = batchList
// Queue up the Update for later batch processing by updateLists.
s.workerqueue.Enqueue(cache.ExplicitKey(updateLists))
return &alpha.List{}, nil
}
return nil, errors.Errorf("not found. Value: %s not found in List: %s", in.Value, in.Name)
}
return nil, errors.Errorf("not found. %s List not found", in.Name)
}

// updateList updates the Lists in the GameServer's Status with the batched update list requests.
// Includes all SetCapacity, AddValue, and RemoveValue requests in the batched request.
func (s *SDKServer) updateList(ctx context.Context) error {
gs, err := s.gameServer()
if err != nil {
return err
}
gsCopy := gs.DeepCopy()

s.logger.WithField("batchListUpdates", s.gsListUpdates).Debug("Batch updating List(s)")
s.gsUpdateMutex.Lock()
defer s.gsUpdateMutex.Unlock()

names := []string{}

for name, listReq := range s.gsListUpdates {
list, ok := gsCopy.Status.Lists[name]
if !ok {
continue
}
if listReq.capacitySet != nil {
list.Capacity = *listReq.capacitySet
}
if len(listReq.listDelete) != 0 {
// TODO: Is there a quicker way of doing this while still keeping the ordering in tact?
for _, dltVal := range listReq.listDelete {
for i, val := range list.Values {
if dltVal == val {
// Remove value (maintains list ordering and modifies underlying gameserverstatus List.Values array).
list.Values = append(list.Values[:i], list.Values[i+1:]...)
}
}
}
}
if len(listReq.listAppend) != 0 {
list.Values = agonesv1.MergeRemoveDuplicates(list.Values, listReq.listAppend)
}

if int64(len(list.Values)) > list.Capacity {
s.logger.Debugf("truncating Values in Update List request to List Capacity %d", list.Capacity)
list.Values = append([]string{}, list.Values[:list.Capacity]...)
}
gsCopy.Status.Lists[name] = list
names = append(names, name)
}

gs, err = s.gameServerGetter.GameServers(s.namespace).Update(ctx, gsCopy, metav1.UpdateOptions{})
if err != nil {
return err
}

// Record an event per List update
for _, name := range names {
s.recorder.Event(gs, corev1.EventTypeNormal, "UpdateList", fmt.Sprintf("List %s updated", name))
s.logger.Debugf("List %s updated to List Capacity: %d, Values: %v",
name, gs.Status.Lists[name].Capacity, gs.Status.Lists[name].Values)
}

// Cache a copy of the successfully updated gameserver
s.gsCopy = gs
// Clear the gsCounterUpdates
s.gsCounterUpdates = map[string]counterUpdateRequest{}

return nil
}

// sendGameServerUpdate sends a watch game server event
Expand Down

0 comments on commit dda41d3

Please sign in to comment.