Skip to content

Commit

Permalink
Fix a few corner cases when updating Egress conditions
Browse files Browse the repository at this point in the history
1. Avoid generating a transient IPAssigned failure by differentiating
scheduling failure from unprocessed case.
2. Fix duplicate IPAllocated conditions.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Oct 16, 2023
1 parent ed3c85b commit 335b354
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 42 deletions.
25 changes: 16 additions & 9 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func (c *EgressController) unbindPodEgress(pod, egress string) (string, bool) {
return "", false
}

func (c *EgressController) updateEgressStatus(egress *crdv1b1.Egress, egressIP string) error {
func (c *EgressController) updateEgressStatus(egress *crdv1b1.Egress, egressIP string, scheduleErr error) error {
isLocal := false
if egressIP != "" {
isLocal = c.localIPDetector.IsLocalIP(egressIP)
Expand Down Expand Up @@ -647,14 +647,16 @@ func (c *EgressController) updateEgressStatus(egress *crdv1b1.Egress, egressIP s
}
desiredStatus.EgressNode = ""
desiredStatus.EgressIP = ""
if isEgressSchedulable(egress) {
// If the error is nil, it means the Egress hasn't been processed yet. Therefore, we only set IPAssigned
// condition to false when there is an error.
if scheduleErr != nil {
desiredStatus.Conditions = []crdv1b1.EgressCondition{
{
Type: crdv1b1.IPAssigned,
Status: v1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: "NoAvailableNode",
Message: "No available Node can be elected as EgressNode",
Reason: "AssignmentError",
Message: fmt.Sprintf("Failed to assign the IP to EgressNode: %v", scheduleErr),
},
}
}
Expand All @@ -668,12 +670,14 @@ func (c *EgressController) updateEgressStatus(egress *crdv1b1.Egress, egressIP s
if compareEgressStatus(toUpdate.Status, *desiredStatus) {
return nil
}
statusToUpdate := desiredStatus.DeepCopy()
// Copy conditions other than crdv1b1.IPAssigned to statusToUpdate.
for _, c := range toUpdate.Status.Conditions {
if c.Type != crdv1b1.IPAssigned {
desiredStatus.Conditions = append(desiredStatus.Conditions, c)
statusToUpdate.Conditions = append(statusToUpdate.Conditions, c)
}
}
toUpdate.Status = *desiredStatus
toUpdate.Status = *statusToUpdate

klog.V(2).InfoS("Updating Egress status", "Egress", egress.Name, "oldNode", egress.Status.EgressNode, "newNode", toUpdate.Status.EgressNode)
_, updateErr = c.crdClient.CrdV1beta1().Egresses().UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{})
Expand Down Expand Up @@ -717,13 +721,16 @@ func (c *EgressController) syncEgress(egressName string) error {

var desiredEgressIP string
var desiredNode string
var scheduleErr error
// Only check whether the Egress IP should be assigned to this Node when the Egress is schedulable.
// Otherwise, users are responsible for assigning the Egress IP to Nodes.
if isEgressSchedulable(egress) {
egressIP, egressNode, scheduled := c.egressIPScheduler.GetEgressIPAndNode(egressName)
egressIP, egressNode, err, scheduled := c.egressIPScheduler.GetEgressIPAndNode(egressName)
if scheduled {
desiredEgressIP = egressIP
desiredNode = egressNode
} else {
scheduleErr = err
}
} else {
desiredEgressIP = egress.Spec.EgressIP
Expand All @@ -739,7 +746,7 @@ func (c *EgressController) syncEgress(egressName string) error {
}
// Do not proceed if EgressIP is empty.
if desiredEgressIP == "" {
if err := c.updateEgressStatus(egress, ""); err != nil {
if err := c.updateEgressStatus(egress, "", scheduleErr); err != nil {
return fmt.Errorf("update Egress %s status error: %v", egressName, err)
}
return nil
Expand Down Expand Up @@ -778,7 +785,7 @@ func (c *EgressController) syncEgress(egressName string) error {
eState.mark = mark
}

if err := c.updateEgressStatus(egress, desiredEgressIP); err != nil {
if err := c.updateEgressStatus(egress, desiredEgressIP, nil); err != nil {
return fmt.Errorf("update Egress %s status error: %v", egressName, err)
}

Expand Down
40 changes: 16 additions & 24 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -62,6 +63,12 @@ const (
fakeNode = "node1"
)

var semanticIgnoringTime = conversion.EqualitiesOrDie(
func(a, b metav1.Time) bool {
return true
},
)

type fakeLocalIPDetector struct {
localIPs sets.Set[string]
}
Expand Down Expand Up @@ -581,12 +588,12 @@ func TestSyncEgress(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1},
Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode, Conditions: nil},
Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "egressB", UID: "uidB"},
Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP2, ExternalIPPool: "external-ip-pool"},
Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP2, EgressNode: fakeNode, Conditions: []crdv1b1.EgressCondition{{Type: crdv1b1.IPAssigned, Status: v1.ConditionTrue, Reason: "Assigned", Message: "EgressIP is successfully assigned to EgressNode", LastTransitionTime: metav1.NewTime(time.Time{})}}},
Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP2, EgressNode: fakeNode, Conditions: []crdv1b1.EgressCondition{{Type: crdv1b1.IPAssigned, Status: v1.ConditionTrue, Reason: "Assigned", Message: "EgressIP is successfully assigned to EgressNode"}}},
},
},
expectedCalls: func(mockOFClient *openflowtest.MockClient, mockRouteClient *routetest.MockInterface, mockIPAssigner *ipassignertest.MockIPAssigner) {
Expand Down Expand Up @@ -631,7 +638,7 @@ func TestSyncEgress(t *testing.T) {
{
ObjectMeta: metav1.ObjectMeta{Name: "egressA", UID: "uidA"},
Spec: crdv1b1.EgressSpec{EgressIP: fakeLocalEgressIP1, ExternalIPPool: "external-ip-pool"},
Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode, Conditions: []crdv1b1.EgressCondition{{Type: crdv1b1.IPAssigned, Status: v1.ConditionTrue, Reason: "Assigned", Message: "EgressIP is successfully assigned to EgressNode", LastTransitionTime: metav1.NewTime(time.Time{})}}},
Status: crdv1b1.EgressStatus{EgressIP: fakeLocalEgressIP1, EgressNode: fakeNode, Conditions: []crdv1b1.EgressCondition{{Type: crdv1b1.IPAssigned, Status: v1.ConditionTrue, Reason: "Assigned", Message: "EgressIP is successfully assigned to EgressNode"}}},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "egressB", UID: "uidB"},
Expand Down Expand Up @@ -739,7 +746,7 @@ func TestSyncEgress(t *testing.T) {
for _, expectedEgress := range tt.expectedEgresses {
gotEgress, err := c.crdClient.CrdV1beta1().Egresses().Get(context.TODO(), expectedEgress.Name, metav1.GetOptions{})
require.NoError(t, err)
equalEgressNoTimestamp(t, expectedEgress, gotEgress)
assert.True(t, semanticIgnoringTime.DeepEqual(expectedEgress, gotEgress))
}
})
}
Expand Down Expand Up @@ -1013,7 +1020,7 @@ func TestUpdateEgressStatus(t *testing.T) {
c := &EgressController{crdClient: fakeClient, nodeName: fakeNode, localIPDetector: localIPDetector}
_, err := c.crdClient.CrdV1beta1().Egresses().Create(context.TODO(), &egress, metav1.CreateOptions{})
assert.NoError(t, err)
err = c.updateEgressStatus(&egress, fakeLocalEgressIP1)
err = c.updateEgressStatus(&egress, fakeLocalEgressIP1, nil)
if err != tt.expectedError {
t.Errorf("Update Egress error not match, got: %v, expected: %v", err, tt.expectedError)
}
Expand Down Expand Up @@ -1205,11 +1212,10 @@ func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface
func TestCompareEgressStatus(t *testing.T) {
newIPAssignedCondition := func(c v1.ConditionStatus, reason string, message string) crdv1b1.EgressCondition {
return crdv1b1.EgressCondition{
Type: crdv1b1.IPAssigned,
Status: c,
Message: message,
Reason: reason,
LastTransitionTime: metav1.Now(),
Type: crdv1b1.IPAssigned,
Status: c,
Message: message,
Reason: reason,
}
}
tests := []struct {
Expand Down Expand Up @@ -1302,17 +1308,3 @@ func TestCompareEgressStatus(t *testing.T) {
})
}
}

func equalEgressNoTimestamp(t *testing.T, expectedEgress, actualEgress *crdv1b1.Egress) bool {
if expectedEgress == nil || actualEgress == nil || expectedEgress.Status.Conditions == nil || actualEgress.Status.Conditions == nil {
return assert.Equal(t, expectedEgress, actualEgress)
}
for i := range expectedEgress.Status.Conditions {
expectedEgress.Status.Conditions[i].LastTransitionTime = metav1.NewTime(time.Time{})
}
for i := range actualEgress.Status.Conditions {
actualEgress.Status.Conditions[i].LastTransitionTime = metav1.NewTime(time.Time{})
}
t.Log(actualEgress.Status.Conditions)
return assert.Equal(t, expectedEgress, actualEgress)
}
14 changes: 10 additions & 4 deletions pkg/agent/controller/egress/ip_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type scheduleEventHandler func(egress string)
type scheduleResult struct {
ip string
node string
err error
}

// egressIPScheduler is responsible for scheduling Egress IPs to appropriate Nodes according to the Node selector of the
Expand Down Expand Up @@ -248,15 +249,18 @@ func (s *egressIPScheduler) AddEventHandler(handler scheduleEventHandler) {
s.eventHandlers = append(s.eventHandlers, handler)
}

func (s *egressIPScheduler) GetEgressIPAndNode(egress string) (string, string, bool) {
func (s *egressIPScheduler) GetEgressIPAndNode(egress string) (string, string, error, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()

result, exists := s.scheduleResults[egress]
if !exists {
return "", "", false
return "", "", nil, false
}
return result.ip, result.node, true
if result.err != nil {
return "", "", result.err, false
}
return result.ip, result.node, nil, true
}

// EgressesByCreationTimestamp sorts a list of Egresses by creation timestamp.
Expand Down Expand Up @@ -356,6 +360,8 @@ func (s *egressIPScheduler) schedule() {
} else {
klog.ErrorS(err, "Failed to select Node for Egress", "egress", klog.KObj(egress))
}
// Store error in its result to differentiate scheduling error from unprocessed case.
newResults[egress.Name] = &scheduleResult{err: err}
continue
}
result := &scheduleResult{
Expand All @@ -380,7 +386,7 @@ func (s *egressIPScheduler) schedule() {
prevResults := s.scheduleResults
for egress, result := range newResults {
prevResult, exists := prevResults[egress]
if !exists || prevResult.ip != result.ip || prevResult.node != result.node {
if !exists || prevResult.ip != result.ip || prevResult.node != result.node || prevResult.err != result.err {
egressesToUpdate = append(egressesToUpdate, egress)
}
delete(prevResults, egress)
Expand Down
12 changes: 10 additions & 2 deletions pkg/agent/controller/egress/ip_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func TestSchedule(t *testing.T) {
node: "node2",
ip: "1.1.1.11",
},
"egressC": {
err: memberlist.ErrNoNodeAvailable,
},
},
},
{
Expand Down Expand Up @@ -178,6 +181,9 @@ func TestSchedule(t *testing.T) {
node: "node3",
ip: "1.1.1.11",
},
"egressC": {
err: memberlist.ErrNoNodeAvailable,
},
},
},
}
Expand Down Expand Up @@ -329,7 +335,7 @@ func TestRun(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "egressD", UID: "uidD", CreationTimestamp: metav1.NewTime(time.Unix(4, 0))},
Spec: crdv1b1.EgressSpec{EgressIP: "1.1.1.1", ExternalIPPool: "pool1"},
}, metav1.CreateOptions{})
assertReceivedItems(t, egressUpdates, sets.New[string]())
assertReceivedItems(t, egressUpdates, sets.New[string]("egressD"))
assertScheduleResult(t, s, "egressD", "", "", false)

// After node2 joins, egressB should be moved to node2 determined by its consistent hash result, and egressD should be assigned to node1.
Expand Down Expand Up @@ -357,6 +363,7 @@ func TestRun(t *testing.T) {
}

func assertReceivedItems(t *testing.T, ch <-chan string, expectedItems sets.Set[string]) {
t.Helper()
receivedItems := sets.New[string]()
for i := 0; i < expectedItems.Len(); i++ {
select {
Expand All @@ -376,7 +383,8 @@ func assertReceivedItems(t *testing.T, ch <-chan string, expectedItems sets.Set[
}

func assertScheduleResult(t *testing.T, s *egressIPScheduler, egress, egressIP, egressNode string, scheduled bool) {
gotEgressIP, gotEgressNode, gotScheduled := s.GetEgressIPAndNode(egress)
t.Helper()
gotEgressIP, gotEgressNode, _, gotScheduled := s.GetEgressIPAndNode(egress)
assert.Equal(t, egressIP, gotEgressIP)
assert.Equal(t, egressNode, gotEgressNode)
assert.Equal(t, scheduled, gotScheduled)
Expand Down
6 changes: 5 additions & 1 deletion pkg/apis/crd/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,12 @@ type EgressStatus struct {
type EgressConditionType string

const (
// IPAllocated means at least one IP has been allocated to the Egress from ExternalIPPool.
// It is not applicable for Egresses with empty ExternalIPPool.
IPAllocated EgressConditionType = "IPAllocated"
IPAssigned EgressConditionType = "IPAssigned"
// IPAssigned means the Egress has been assigned to a Node.
// It is not applicable for Egresses with empty ExternalIPPool.
IPAssigned EgressConditionType = "IPAssigned"
)

type EgressCondition struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/apis/crd/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,8 +843,12 @@ type EgressStatus struct {
type EgressConditionType string

const (
// IPAllocated means at least one IP has been allocated to the Egress from ExternalIPPool.
// It is not applicable for Egresses with empty ExternalIPPool.
IPAllocated EgressConditionType = "IPAllocated"
IPAssigned EgressConditionType = "IPAssigned"
// IPAssigned means the Egress has been assigned to a Node.
// It is not applicable for Egresses with empty ExternalIPPool.
IPAssigned EgressConditionType = "IPAssigned"
)

type EgressCondition struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/egress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (c *EgressController) updateEgressAllocatedCondition(egress *egressv1beta1.
Type: egressv1beta1.IPAllocated,
Status: v1.ConditionFalse,
Reason: "AllocationError",
Message: fmt.Sprintf("Cannot allocate EgressIP from ExternalIPPool %s due to: %v", egress.Spec.ExternalIPPool, err),
Message: fmt.Sprintf("Cannot allocate EgressIP from ExternalIPPool: %v", err),
LastTransitionTime: metav1.Now(),
}
}
Expand Down

0 comments on commit 335b354

Please sign in to comment.