Skip to content

Commit

Permalink
Changed port info map as well.
Browse files Browse the repository at this point in the history
  • Loading branch information
prameshj committed Jul 29, 2020
1 parent 7051250 commit 387db86
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func (c *Controller) mergeVmIpNEGsPortInfo(service *apiv1.Service, name types.Na
// Update usage metrics.
negUsage.VmIpNeg = usage.NewVmIpNegType(onlyLocal)

return portInfoMap.Merge(negtypes.NewPortInfoMapForVMIPNEG(name.Namespace, name.Name, c.namer, !onlyLocal))
return portInfoMap.Merge(negtypes.NewPortInfoMapForVMIPNEG(name.Namespace, name.Name, c.namer, onlyLocal))
}

// mergeDefaultBackendServicePortInfoMap merge the PortInfoMap for the default backend service into portInfoMap
Expand Down
12 changes: 6 additions & 6 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ func TestEnableNEGServiceWithL4ILB(t *testing.T) {
controller.runL4 = true
defer controller.stop()
var prevSyncerKey, updatedSyncerKey negtypes.NegSyncerKey
randomize := false
localMode := false
t.Logf("Creating L4 ILB service with ExternalTrafficPolicy:Cluster")
controller.serviceLister.Add(newTestILBService(controller, !randomize, 80))
controller.serviceLister.Add(newTestILBService(controller, localMode, 80))
svcClient := controller.client.CoreV1().Services(testServiceNamespace)
svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName)
err := controller.processService(svcKey)
Expand All @@ -303,7 +303,7 @@ func TestEnableNEGServiceWithL4ILB(t *testing.T) {
t.Fatalf("Service was not created.(*apiv1.Service) successfully, err: %v", err)
}
expectedPortInfoMap := negtypes.NewPortInfoMapForVMIPNEG(testServiceNamespace, testServiceName,
controller.namer, randomize)
controller.namer, localMode)
// There will be only one entry in the map
for key, val := range expectedPortInfoMap {
prevSyncerKey = getSyncerKey(testServiceNamespace, testServiceName, key, val)
Expand All @@ -313,15 +313,15 @@ func TestEnableNEGServiceWithL4ILB(t *testing.T) {
validateServiceAnnotationWithPortInfoMap(t, svc, expectedPortInfoMap)
// Now Update the service to change the TrafficPolicy
t.Logf("Updating L4 ILB service from ExternalTrafficPolicy:Cluster to Local")
randomize = !randomize
if err = controller.serviceLister.Update(updateTestILBService(controller, !randomize, svc)); err != nil {
localMode = true
if err = controller.serviceLister.Update(updateTestILBService(controller, localMode, svc)); err != nil {
t.Fatalf("Failed to update test L4 ILB service: %v", err)
}
if err = controller.processService(svcKey); err != nil {
t.Fatalf("Failed to process updated L4 ILB srvice: %v", err)
}
expectedPortInfoMap = negtypes.NewPortInfoMapForVMIPNEG(testServiceNamespace, testServiceName,
controller.namer, randomize)
controller.namer, localMode)
// There will be only one entry in the map
for key, val := range expectedPortInfoMap {
updatedSyncerKey = getSyncerKey(testServiceNamespace, testServiceName, key, val)
Expand Down
7 changes: 2 additions & 5 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
if !ok {
// determine the implementation that calculates NEG endpoints on each sync.
epc := negsyncer.GetEndpointsCalculator(manager.nodeLister, manager.podLister, manager.zoneGetter,
syncerKey, portInfo.RandomizeEndpoints)
syncerKey, portInfo.EpCalculatorMode)
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
portInfo.NegName,
Expand Down Expand Up @@ -338,10 +338,7 @@ func getSyncerKey(namespace, name string, servicePortKey negtypes.PortInfoMapKey
}
if portInfo.PortTuple.Empty() {
networkEndpointType = negtypes.VmIpEndpointType
calculatorMode = negtypes.L4LocalMode
if portInfo.RandomizeEndpoints {
calculatorMode = negtypes.L4ClusterMode
}
calculatorMode = portInfo.EpCalculatorMode
}

return negtypes.NegSyncerKey{
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
// The L7 implementation is tested in TestToZoneNetworkEndpointMapUtil.
func TestLocalGetEndpointSet(t *testing.T) {
t.Parallel()
_, transactionSyncer := newL4ILBTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), false)
mode := negtypes.L4LocalMode
_, transactionSyncer := newL4ILBTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), mode)
nodeNames := []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6}
for i := 0; i < len(nodeNames); i++ {
err := transactionSyncer.nodeLister.Add(&v1.Node{
Expand Down
8 changes: 5 additions & 3 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,16 @@ func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGro
return syncer
}

func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, randomizeEndpoints bool) negtypes.NetworkEndpointsCalculator {
func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode) negtypes.NetworkEndpointsCalculator {
serviceKey := strings.Join([]string{syncerKey.Name, syncerKey.Namespace}, "/")
if syncerKey.NegType == negtypes.VmIpEndpointType {
nodeLister := listers.NewNodeLister(nodeLister)
if randomizeEndpoints {
switch mode {
case negtypes.L4LocalMode:
return NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, serviceKey)
default:
return NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, serviceKey)
}
return NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, serviceKey)
}
return NewL7EndpointsCalculator(zoneGetter, podLister, syncerKey.PortTuple.Name,
syncerKey.SubsetLabels, syncerKey.NegType)
Expand Down
8 changes: 5 additions & 3 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,9 +836,9 @@ func TestCommitPods(t *testing.T) {
}
}

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, randomize bool) (negtypes.NegSyncer, *transactionSyncer) {
func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, randomize)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode)
return negsyncer, ts
}

Expand All @@ -861,10 +861,12 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp
TargetPort: "8080",
},
}
var mode negtypes.EndpointsCalculatorMode
if negType == negtypes.VmIpEndpointType {
svcPort.PortTuple.Port = 0
svcPort.PortTuple.TargetPort = ""
svcPort.PortTuple.Name = string(negtypes.VmIpEndpointType)
mode = negtypes.L4LocalMode
}

// TODO(freehan): use real readiness reflector
Expand All @@ -881,7 +883,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp
context.NodeInformer.GetIndexer(),
reflector,
GetEndpointsCalculator(context.NodeInformer.GetIndexer(), context.PodInformer.GetIndexer(), negtypes.NewFakeZoneGetter(),
svcPort, false))
svcPort, mode))
transactionSyncer := negsyncer.(*syncer).core.(*transactionSyncer)
return negsyncer, transactionSyncer
}
Expand Down
25 changes: 14 additions & 11 deletions pkg/neg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ type PortInfo struct {
// This is enabled with service port is reference by ingress.
// If the service port is only exposed as stand alone NEG, it should not be enbled.
ReadinessGate bool
// RandomizeEndpoints indicates if the endpoints for the NEG associated with this port need to
// EpCalculatorMode indicates if the endpoints for the NEG associated with this port need to
// be selected at random, rather than selecting the endpoints of this service. This is applicable
// in GCE_VM_IP NEGs where the endpoints are the nodes instead of pods.
RandomizeEndpoints bool
EpCalculatorMode EndpointsCalculatorMode
}

// PortInfoMapKey is the Key of PortInfoMap
Expand Down Expand Up @@ -144,18 +144,22 @@ func NewPortInfoMap(namespace, name string, svcPortTupleSet SvcPortTupleSet, nam

// NewPortInfoMapForVMIPNEG creates PortInfoMap with empty port tuple. Since VM_IP NEGs target
// the node instead of the pod, there is no port info to be stored.
func NewPortInfoMapForVMIPNEG(namespace, name string, namer NetworkEndpointGroupNamer, randomize bool) PortInfoMap {
func NewPortInfoMapForVMIPNEG(namespace, name string, namer NetworkEndpointGroupNamer, local bool) PortInfoMap {
ret := PortInfoMap{}
svcPortSet := make(SvcPortTupleSet)
svcPortSet.Insert(
// Insert Empty PortTuple for VmIp NEGs.
SvcPortTuple{},
)
for svcPortTuple := range svcPortSet {
mode := L4ClusterMode
if local {
mode = L4LocalMode
}
ret[PortInfoMapKey{svcPortTuple.Port, ""}] = PortInfo{
PortTuple: svcPortTuple,
NegName: namer.VMIPNEG(namespace, name),
RandomizeEndpoints: randomize,
PortTuple: svcPortTuple,
NegName: namer.VMIPNEG(namespace, name),
EpCalculatorMode: mode,
}
}
return ret
Expand Down Expand Up @@ -192,8 +196,7 @@ func NewPortInfoMapWithDestinationRule(namespace, name string, svcPortTupleSet S
// It assumes the same key (service port) will have the same target port and negName
// If not, it will throw error
// If a key in p1 or p2 has readiness gate enabled, the merged port info will also has readiness gate enabled
// If a key in p1 or p2 has randomize endpoints enabled, the merged port info will also has randomize endpoints enabled.
// This field is only applicable for VMPrimaryIP NEGs.
// The merged port info will have the same Endpoints Calculator mode as p1 and p2. This field is only needed for VMPrimaryIP NEGs.
func (p1 PortInfoMap) Merge(p2 PortInfoMap) error {
var err error
for mapKey, portInfo := range p2 {
Expand All @@ -208,16 +211,16 @@ func (p1 PortInfoMap) Merge(p2 PortInfoMap) error {
if existingPortInfo.Subset != portInfo.Subset {
return fmt.Errorf("for service port %v, Subset name in existing map is %q, but the merge map has %q", mapKey, existingPortInfo.Subset, portInfo.Subset)
}
if existingPortInfo.RandomizeEndpoints != portInfo.RandomizeEndpoints {
return fmt.Errorf("For service port %v, Existing map has RandomizeEndpoints %v, but the merge map has %v", mapKey, existingPortInfo.RandomizeEndpoints, portInfo.RandomizeEndpoints)
if existingPortInfo.EpCalculatorMode != portInfo.EpCalculatorMode {
return fmt.Errorf("For service port %v, Existing map has Calculator mode %v, but the merge map has %v", mapKey, existingPortInfo.EpCalculatorMode, portInfo.EpCalculatorMode)
}
mergedInfo.ReadinessGate = existingPortInfo.ReadinessGate
}
mergedInfo.PortTuple = portInfo.PortTuple
mergedInfo.NegName = portInfo.NegName
// Turn on the readiness gate if one of them is on
mergedInfo.ReadinessGate = mergedInfo.ReadinessGate || portInfo.ReadinessGate
mergedInfo.RandomizeEndpoints = portInfo.RandomizeEndpoints
mergedInfo.EpCalculatorMode = portInfo.EpCalculatorMode
mergedInfo.Subset = portInfo.Subset
mergedInfo.SubsetLabels = portInfo.SubsetLabels

Expand Down

0 comments on commit 387db86

Please sign in to comment.