diff --git a/pkg/neg/syncers/dualstack/migrator.go b/pkg/neg/syncers/dualstack/migrator.go index 73f516d582..5f820629ef 100644 --- a/pkg/neg/syncers/dualstack/migrator.go +++ b/pkg/neg/syncers/dualstack/migrator.go @@ -17,6 +17,7 @@ limitations under the License. package dualstack import ( + "math" "sync" "time" @@ -27,6 +28,16 @@ import ( const ( // Default time to wait between two successive migration-detachments. defaultMigrationWaitDuration = 1 * time.Minute + // Default threshold used to determine if it has been too long since the last + // successful detachment. + defaultPreviousDetachThreshold = 5 * time.Minute + // Default multiplication factor used to decide the maximum number of + // endpoints which the migrator can attempt to migrate in each Filter + // invocation. + defaultFractionOfMigratingEndpoints float64 = 0.1 + // Default multiplication factor used to decide if there are too many pending + // NEG attach operations. + defaultFractionForPendingAttachmentThreshold float64 = 0.5 ) // Migrator exposes functions to control the migration of single-stack @@ -44,25 +55,33 @@ const ( // // An endpoint is said to be a migration-endpoint if its current state is // single-stack but desired state is dual-stack (and vice versa.) -// -// TODO(gauravkghildiyal): Add details about the heuristics as we go on -// implementing. type Migrator struct { // Setting this to false will make all exported functions a no-op. enableDualStack bool // The NEG syncer which will be synced when Continue gets called. syncer syncable - // mu protects paused and continueInProgress. + // mu protects paused, continueInProgress and previousDetach. mu sync.Mutex // Identifies whether the migrator is paused. paused bool // Identifies if some async operation triggered by Continue is still in // progress. continueInProgress bool + // The most recent time when Continue was invoked for a successful detachment. + previousDetach time.Time // Time to wait between two successive migration-detachments. migrationWaitDuration time.Duration + // Threshold used to determine if it has been too long since the + // last successful detachment. + previousDetachThreshold time.Duration + // Multiplication factor used to decide the maximum number of endpoints which + // the migrator can attempt to migrate in each Filter invocation. + fractionOfMigratingEndpoints float64 + // Multiplication factor used to decide if there are too many pending NEG + // attach operations. + fractionForPendingAttachmentThreshold float64 logger klog.Logger } @@ -73,10 +92,13 @@ type syncable interface { func NewMigrator(enableDualStackNEG bool, syncer syncable, logger klog.Logger) *Migrator { return &Migrator{ - enableDualStack: enableDualStackNEG, - syncer: syncer, - migrationWaitDuration: defaultMigrationWaitDuration, - logger: logger.WithName("DualStackMigrator"), + enableDualStack: enableDualStackNEG, + syncer: syncer, + migrationWaitDuration: defaultMigrationWaitDuration, + previousDetachThreshold: defaultPreviousDetachThreshold, + fractionOfMigratingEndpoints: defaultFractionOfMigratingEndpoints, + fractionForPendingAttachmentThreshold: defaultFractionForPendingAttachmentThreshold, + logger: logger.WithName("DualStackMigrator"), } } @@ -91,9 +113,6 @@ func NewMigrator(enableDualStackNEG bool, syncer syncable, logger klog.Logger) * // empty return value signifies that detachment was not started (which is the // case when there were no migration-endpoints to begin with, or the migrator // was paused.) -// -// Refer the comment on [Migrator] for further details and -// terminologies. func (d *Migrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[string]types.NetworkEndpointSet) string { if !d.enableDualStack { return "" @@ -101,19 +120,14 @@ func (d *Migrator) Filter(addEndpoints, removeEndpoints, committedEndpoints map[ _, migrationEndpointsInRemoveSet := findAndFilterMigrationEndpoints(addEndpoints, removeEndpoints) - if d.isPaused() { + migrationCount := endpointsCount(migrationEndpointsInRemoveSet) + paused := d.isPaused() + if migrationCount == 0 || paused { + d.logger.V(2).Info("Not starting migration detachments", "migrationCount", migrationCount, "paused", paused) return "" } - // TODO(gauravkghildiyal): Implement rate limited migration-detachment. - for zone, endpointSet := range migrationEndpointsInRemoveSet { - if endpointSet.Len() != 0 { - removeEndpoints[zone] = removeEndpoints[zone].Union(endpointSet) - return zone - } - } - - return "" + return d.calculateMigrationEndpointsToDetach(addEndpoints, removeEndpoints, committedEndpoints, migrationEndpointsInRemoveSet) } // Pause will prevent any subsequent Filter() invocations from starting @@ -168,6 +182,7 @@ func (d *Migrator) Continue(err error) { // NEG Detach succeeded; unpause after migrationWaitDuration and trigger // resync. d.continueInProgress = true + d.previousDetach = time.Now() go func() { time.Sleep(d.migrationWaitDuration) @@ -187,6 +202,83 @@ func (d *Migrator) isPaused() bool { return d.paused } +// calculateMigrationEndpointsToDetach will move a subset of migrationEndpoints +// to removeEndpoints so that they can be detached. The number of endpoints to +// move will be determined using the following heuritic: +// +// 1. The desired number of endpoints which can be moved is: +// +// fractionOfMigratingEndpoints * count(committedEndpoints + migrationEndpoints) +// +// 2. All endpoints being moved will be from the same zone. +// +// 3. If all zones have less than the desired number of endpoints, then all +// endpoints from the largest zone will be moved. +// +// 4. No endpoints will be moved if there are many endpoints waiting to be +// attached (as determined by the manyEndpointsWaitingToBeAttached() +// function) AND the previous successful detach was quite recent (as +// determined by the tooLongSincePreviousDetach() function) +func (d *Migrator) calculateMigrationEndpointsToDetach(addEndpoints, removeEndpoints, committedEndpoints, migrationEndpoints map[string]types.NetworkEndpointSet) string { + addCount := endpointsCount(addEndpoints) + committedCount := endpointsCount(committedEndpoints) + migrationCount := endpointsCount(migrationEndpoints) + + if d.manyEndpointsWaitingToBeAttached(addCount, committedCount, migrationCount) && !d.tooLongSincePreviousDetach() { + d.logger.V(1).Info("Not starting migration detachments; Too many attachments are pending and the threshold for forceful detach hasn't been reached.", + "addCount", addCount, "committedCount", committedCount, "migrationCount", migrationCount, "fractionForPendingAttachmentThreshold", d.fractionForPendingAttachmentThreshold, + "previousDetach", d.previousDetach, "previousDetachThreshold", d.previousDetachThreshold) + return "" + } + + // Find the zone which has the maximum number of migration-endpoints. + zone, maxZoneEndpointCount := "", 0 + for curZone, endpointSet := range migrationEndpoints { + if endpointSet.Len() > maxZoneEndpointCount { + maxZoneEndpointCount = endpointSet.Len() + zone = curZone + } + } + if zone == "" { + return "" + } + + currentlyMigratingCount := int(math.Ceil(float64(committedCount+migrationCount) * d.fractionOfMigratingEndpoints)) + if currentlyMigratingCount > maxZoneEndpointCount { + currentlyMigratingCount = maxZoneEndpointCount + } + d.logger.V(2).Info("Result of migration heuristic calculations", "currentlyMigratingCount", currentlyMigratingCount, "totalMigrationCount", migrationCount) + + if removeEndpoints[zone] == nil { + removeEndpoints[zone] = types.NewNetworkEndpointSet() + } + for i := 0; i < currentlyMigratingCount; i++ { + endpoint, ok := migrationEndpoints[zone].PopAny() + if !ok { + break + } + removeEndpoints[zone].Insert(endpoint) + } + + return zone +} + +// Returns true if there are many endpoints waiting to be attached. +// +// Refer the implementation below to get the exact heuristic being used to +// determine this. +func (d *Migrator) manyEndpointsWaitingToBeAttached(addCount, committedCount, migrationCount int) bool { + return addCount >= int(math.Ceil(float64(committedCount+migrationCount)*d.fractionForPendingAttachmentThreshold)) +} + +// Returns true if the time since the last successful detach exceeds the +// previousDetachThreshold +func (d *Migrator) tooLongSincePreviousDetach() bool { + d.mu.Lock() + defer d.mu.Unlock() + return time.Since(d.previousDetach) >= d.previousDetachThreshold +} + // findAndFilterMigrationEndpoints will filter out the migration endpoints from // the `addEndpoints` and `removeEndpoints` sets. The passed sets will get // modified. The returned value will be two endpoints sets which will contain @@ -258,3 +350,11 @@ func moveEndpoint(e types.NetworkEndpoint, source, dest map[string]types.Network } return false } + +func endpointsCount(endpointSets map[string]types.NetworkEndpointSet) int { + var count int + for _, endpointSet := range endpointSets { + count += endpointSet.Len() + } + return count +} diff --git a/pkg/neg/syncers/dualstack/migrator_test.go b/pkg/neg/syncers/dualstack/migrator_test.go index dc42045156..12a244e6d5 100644 --- a/pkg/neg/syncers/dualstack/migrator_test.go +++ b/pkg/neg/syncers/dualstack/migrator_test.go @@ -22,9 +22,79 @@ func TestFilter(t *testing.T) { wantRemoveEndpoints map[string]types.NetworkEndpointSet wantMigrationZone bool }{ + { + desc: "paused migrator should only filter migration endpoints and not start detachment", + migrator: func() *Migrator { + m := newMigratorForTest(true) + m.Pause() + return m + }(), + addEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, // migrating + {IP: "b"}, + }...), + }, + removeEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a"}, // migrating + {IP: "c", IPv6: "C"}, + }...), + }, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IPv6: "D"}, + }...), + }, + wantAddEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "b"}, + }...), + }, + wantRemoveEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + // Migration-endpoints were filtered out but no new migration + // detachment was started. + {IP: "c", IPv6: "C"}, + }...), + }, + }, + { + desc: "unpaused migrator should filter migration endpoints AND also start detachment", + migrator: newMigratorForTest(true), + addEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a", IPv6: "A"}, // migrating + {IP: "b"}, + }...), + }, + removeEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a"}, // migrating + {IP: "c", IPv6: "C"}, + }...), + }, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IPv6: "D"}, + }...), + }, + wantAddEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "b"}, + }...), + }, + wantRemoveEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "a"}, // Migration detachment started. + {IP: "c", IPv6: "C"}, + }...), + }, + wantMigrationZone: true, + }, { desc: "migrator should do nothing if enableDualStack is false", - migrator: &Migrator{enableDualStack: false}, + migrator: newMigratorForTest(false), addEndpoints: map[string]types.NetworkEndpointSet{ "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ {IP: "a", IPv6: "A"}, // migrating @@ -227,6 +297,232 @@ func (f *fakeSyncable) Sync() bool { return true } +func TestContinue_NoInputError_ShouldChangeTimeSincePreviousDetach(t *testing.T) { + t.Parallel() + + syncable := &fakeSyncable{} + + migrator := &Migrator{ + enableDualStack: true, + paused: true, + previousDetachThreshold: 20 * time.Millisecond, + syncer: syncable, + logger: klog.Background(), + } + + // Ensure that before Continue, tooLongSincePreviousDetach() returns true. + if !migrator.tooLongSincePreviousDetach() { + t.Fatalf("Precondition failed; tooLongSincePreviousDetach() = 'false'; want 'true' before calling Continue.") + } + + migrator.Continue(nil) + + // Ensure that immediately after calling Continue, + // tooLongSincePreviousDetach() returns false. + if migrator.tooLongSincePreviousDetach() { + t.Errorf("tooLongSincePreviousDetach() = 'true'; want 'false' immediately after calling Continue()") + } + + // Ensure that previousDetachThreshold time after calling Continue, + // tooLongSincePreviousDetach() returns true. + time.Sleep(migrator.previousDetachThreshold) + if !migrator.tooLongSincePreviousDetach() { + t.Errorf("Precondition not met; tooLongSincePreviousDetach() = 'false'; want 'true' after previousDetachThreshold time has elapsed") + } +} + +func TestCalculateMigrationEndpointsToDetach(t *testing.T) { + testCases := []struct { + desc string + addEndpoints map[string]types.NetworkEndpointSet + removeEndpoints map[string]types.NetworkEndpointSet + committedEndpoints map[string]types.NetworkEndpointSet + migrationEndpoints map[string]types.NetworkEndpointSet + migrator *Migrator + wantCurrentlyMigratingCount int + }{ + { + desc: "less than or equal to 10 (committed + migration) endpoints should only detach 1 at a time", + addEndpoints: map[string]types.NetworkEndpointSet{}, + removeEndpoints: map[string]types.NetworkEndpointSet{}, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "1"}, {IP: "2"}, {IP: "3"}, {IP: "4"}, {IP: "5"}, + }...), + }, + migrationEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "6"}, {IP: "7"}, {IP: "8"}, {IP: "9"}, {IP: "10"}, + }...), + }, + migrator: newMigratorForTest(true), + wantCurrentlyMigratingCount: 1, + }, + { + desc: "more than 10 (committed + migration) endpoints can detach more than 1 at a time", + addEndpoints: map[string]types.NetworkEndpointSet{}, + removeEndpoints: map[string]types.NetworkEndpointSet{}, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "1"}, {IP: "2"}, {IP: "3"}, {IP: "4"}, {IP: "5"}, + }...), + }, + migrationEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "6"}, {IP: "7"}, {IP: "8"}, {IP: "9"}, {IP: "10"}, + {IP: "11"}, + }...), + }, + migrator: newMigratorForTest(true), + wantCurrentlyMigratingCount: 2, + }, + { + // If there are many endpoints waiting to be attached and the most recent + // migration was not too long ago, then we will not start any new + // detachments since we wait for the pending attaches to complete + desc: "many endpoints are waiting to be attached AND previous migration was quite recent", + addEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "1"}, {IP: "2"}, {IP: "3"}, {IP: "4"}, {IP: "5"}, + }...), + }, + removeEndpoints: map[string]types.NetworkEndpointSet{}, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "6"}, + }...), + }, + migrationEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "7"}, + }...), + }, + migrator: func() *Migrator { + m := newMigratorForTest(true) + m.previousDetach = time.Now().Add(30 * time.Minute) // Future time. + return m + }(), + wantCurrentlyMigratingCount: 0, + }, + { + // If there are many endpoints waiting to be attached but the most recent + // migration was too long ago, then we don't want to keep waiting + // indefinitely for the next detach and we proceed with the detachments. + desc: "many endpoints are waiting to be attached BUT previous migration was too long ago", + addEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "1"}, {IP: "2"}, {IP: "3"}, {IP: "4"}, {IP: "5"}, + }...), + }, + removeEndpoints: map[string]types.NetworkEndpointSet{}, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "6"}, + }...), + }, + migrationEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "7"}, + }...), + }, + migrator: newMigratorForTest(true), + wantCurrentlyMigratingCount: 1, + }, + { + desc: "no detachments started since nothing to migrate", + addEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "1"}, + }...), + }, + removeEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "2"}, + }...), + }, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "3"}, {IP: "4"}, {IP: "5"}, {IP: "6"}, + }...), + }, + migrationEndpoints: map[string]types.NetworkEndpointSet{}, + migrator: newMigratorForTest(true), + wantCurrentlyMigratingCount: 0, + }, + { + // If our calculations suggest that the number of endpoints to migrate is + // more than the number of endpoints in any single zone, we should not + // include endpoints from multiple zones. + desc: "endpoints from multiple zones should not be detached at once", + addEndpoints: map[string]types.NetworkEndpointSet{}, + removeEndpoints: map[string]types.NetworkEndpointSet{}, + committedEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "1"}, {IP: "2"}, {IP: "3"}, {IP: "4"}, + }...), + "zone2": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "5"}, {IP: "6"}, {IP: "7"}, {IP: "8"}, + }...), + }, + migrationEndpoints: map[string]types.NetworkEndpointSet{ + "zone1": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "9"}, {IP: "10"}, + }...), + "zone2": types.NewNetworkEndpointSet([]types.NetworkEndpoint{ + {IP: "11"}, {IP: "12"}, + }...), + }, + migrator: newMigratorForTest(true), + wantCurrentlyMigratingCount: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + clonedAddEndpoints := cloneZoneNetworkEndpointsMap(tc.addEndpoints) + clonedRemoveEndpoints := cloneZoneNetworkEndpointsMap(tc.removeEndpoints) + clonedCommittedEndpoints := cloneZoneNetworkEndpointsMap(tc.committedEndpoints) + clonedMigrationEndpoints := cloneZoneNetworkEndpointsMap(tc.migrationEndpoints) + + migrationZone := tc.migrator.calculateMigrationEndpointsToDetach(tc.addEndpoints, tc.removeEndpoints, tc.committedEndpoints, tc.migrationEndpoints) + + if tc.wantCurrentlyMigratingCount > 0 && migrationZone == "" { + t.Fatalf("calculateMigrationEndpointsToDetach(...) returned empty zone which means no migration detachment was started; want %v endpoints to undergo detachment", tc.wantCurrentlyMigratingCount) + } + + // Ensure that we didn't modify the addEndpoints and committedEndpoints. + if diff := cmp.Diff(clonedAddEndpoints, tc.addEndpoints); diff != "" { + t.Errorf("Unexpected diff in addEndpoints; calculateMigrationEndpointsToDetach(...) should not modify addEndpoints; (-want +got):\n%s", diff) + } + if diff := cmp.Diff(clonedCommittedEndpoints, tc.committedEndpoints); diff != "" { + t.Errorf("Unexpected diff in committedEndpoints; calculateMigrationEndpointsToDetach(...) should not modify committedEndpoints; (-want +got):\n%s", diff) + } + + // Ensure that the correct number of endpoints were removed from + // "migrationEndpoints" and added to "removeEndpoints". + if gotCurrentlyMigratingCount := endpointsCount(clonedMigrationEndpoints) - endpointsCount(tc.migrationEndpoints); gotCurrentlyMigratingCount != tc.wantCurrentlyMigratingCount { + t.Errorf("Unexpected number of endpoints removed from migrationEndpoints set; got removed count = %v; want = %v", gotCurrentlyMigratingCount, tc.wantCurrentlyMigratingCount) + } + if gotCurrentlyMigratingCount := endpointsCount(tc.removeEndpoints) - endpointsCount(clonedRemoveEndpoints); gotCurrentlyMigratingCount != tc.wantCurrentlyMigratingCount { + t.Errorf("Unexpected number of endpoints added to removeEndpoints set; got newly added count = %v; want = %v", gotCurrentlyMigratingCount, tc.wantCurrentlyMigratingCount) + } + + // Ensure that only the endpoints from the migrationZone were modified. + removedMigrationEndpoints := clonedMigrationEndpoints[migrationZone].Difference(tc.migrationEndpoints[migrationZone]) + if gotCurrentlyMigratingCount := removedMigrationEndpoints.Len(); gotCurrentlyMigratingCount != tc.wantCurrentlyMigratingCount { + t.Errorf("Unexpected number of endpoints removed from migrationEndpoints[%v] set; got removed count = %v; want = %v", migrationZone, gotCurrentlyMigratingCount, tc.wantCurrentlyMigratingCount) + } + + // Ensure that all the endpoints removed from migrationEndpoints were + // added to the removeEndpoints. + newlyAddedEndpointsWithinRemoveSet := tc.removeEndpoints[migrationZone].Difference(clonedRemoveEndpoints[migrationZone]) + if diff := cmp.Diff(removedMigrationEndpoints, newlyAddedEndpointsWithinRemoveSet); diff != "" { + t.Errorf("Unexpected diff between the endpoints removed from migrationEndpoints[%v] and endpoints added to removeEndpoints[%v] (-want +got):\n%s", migrationZone, migrationZone, diff) + } + }) + } +} + func TestFindAndFilterMigrationEndpoints(t *testing.T) { testCases := []struct { name string