Skip to content

Commit

Permalink
Merge pull request #128 from nicksardo/rename-port
Browse files Browse the repository at this point in the history
Rename Port to NodePort
  • Loading branch information
nicksardo authored Feb 6, 2018
2 parents 5291f96 + e4350fa commit 19fb049
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 85 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/app/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func DefaultBackendServicePort(kubeClient kubernetes.Interface) *backends.Servic
}

return &backends.ServicePort{
Port: int64(nodePort),
NodePort: int64(nodePort),
Protocol: annotations.ProtocolHTTP, // The default backend is HTTP.
SvcName: types.NamespacedName{Namespace: parts[0], Name: parts[1]},
SvcPort: intstr.FromInt(int(port)),
Expand Down
30 changes: 14 additions & 16 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,10 @@ func portKey(port int64) string {

// ServicePort for tupling port and protocol
type ServicePort struct {
// Port is the service node port
// TODO: rename it to NodePort
Port int64
Protocol annotations.AppProtocol
SvcName types.NamespacedName
SvcPort intstr.IntOrString
NodePort int64
Protocol annotations.AppProtocol
SvcTargetPort string
NEGEnabled bool
}
Expand Down Expand Up @@ -180,8 +178,8 @@ func (b *Backends) Get(port int64) (*compute.BackendService, error) {
}

func (b *Backends) ensureHealthCheck(sp ServicePort) (string, error) {
hc := b.healthChecker.New(sp.Port, sp.Protocol, sp.NEGEnabled)
existingLegacyHC, err := b.healthChecker.GetLegacy(sp.Port)
hc := b.healthChecker.New(sp.NodePort, sp.Protocol, sp.NEGEnabled)
existingLegacyHC, err := b.healthChecker.GetLegacy(sp.NodePort)
if err != nil && !utils.IsNotFoundError(err) {
return "", err
}
Expand Down Expand Up @@ -228,7 +226,7 @@ func (b *Backends) Ensure(svcPorts []ServicePort, igs []*compute.InstanceGroup)
if igs == nil {
ports := []int64{}
for _, p := range svcPorts {
ports = append(ports, p.Port)
ports = append(ports, p.NodePort)
}
var err error
igs, err = instances.EnsureInstanceGroupsAndPorts(b.nodePool, b.namer, ports)
Expand All @@ -252,7 +250,7 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
// We must track the ports even if creating the backends failed, because
// we might've created health-check for them.
be := &compute.BackendService{}
defer func() { b.snapshotter.Add(portKey(p.Port), be) }()
defer func() { b.snapshotter.Add(portKey(p.NodePort), be) }()

var err error

Expand All @@ -263,14 +261,14 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
}

// Verify existance of a backend service for the proper port, but do not specify any backends/igs
beName := b.namer.Backend(p.Port)
be, _ = b.Get(p.Port)
beName := b.namer.Backend(p.NodePort)
be, _ = b.Get(p.NodePort)
if be == nil {
namedPort := &compute.NamedPort{
Name: b.namer.NamedPort(p.Port),
Port: p.Port,
Name: b.namer.NamedPort(p.NodePort),
Port: p.NodePort,
}
glog.V(2).Infof("Creating backend service for port %v named port %v", p.Port, namedPort)
glog.V(2).Infof("Creating backend service for port %v named port %v", p.NodePort, namedPort)
be, err = b.create(namedPort, hcLink, p, beName)
if err != nil {
return err
Expand Down Expand Up @@ -302,7 +300,7 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr

// If previous health check was legacy type, we need to delete it.
if existingHCLink != hcLink && strings.Contains(existingHCLink, "/httpHealthChecks/") {
if err = b.healthChecker.DeleteLegacy(p.Port); err != nil {
if err = b.healthChecker.DeleteLegacy(p.NodePort); err != nil {
glog.Warning("Failed to delete legacy HttpHealthCheck %v; Will not try again, err: %v", beName, err)
}
}
Expand Down Expand Up @@ -455,7 +453,7 @@ func (b *Backends) edgeHop(be *compute.BackendService, igs []*compute.InstanceGr
func (b *Backends) GC(svcNodePorts []ServicePort) error {
knownPorts := sets.NewString()
for _, p := range svcNodePorts {
knownPorts.Insert(portKey(p.Port))
knownPorts.Insert(portKey(p.NodePort))
}
pool := b.snapshotter.Snapshot()
for port := range pool {
Expand Down Expand Up @@ -516,7 +514,7 @@ func (b *Backends) Link(port ServicePort, zones []string) error {
negs = append(negs, neg)
}

backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.Backend(port.Port))
backendService, err := b.cloud.GetAlphaGlobalBackendService(b.namer.Backend(port.NodePort))
if err != nil {
return err
}
Expand Down
88 changes: 44 additions & 44 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newTestJig(f BackendServices, fakeIGs instances.InstanceGroups, syncWithClo
healthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
healthChecks := healthchecks.NewHealthChecker(healthCheckProvider, "/", defaultNamer)
bp := NewBackendPool(f, negGetter, healthChecks, nodePool, defaultNamer, []int64{}, syncWithCloud)
probes := map[ServicePort]*api_v1.Probe{{Port: 443, Protocol: annotations.ProtocolHTTPS}: existingProbe}
probes := map[ServicePort]*api_v1.Probe{{NodePort: 443, Protocol: annotations.ProtocolHTTPS}: existingProbe}
bp.Init(NewFakeProbeProvider(probes))

return bp, healthCheckProvider
Expand All @@ -76,54 +76,54 @@ func TestBackendPoolAdd(t *testing.T) {
pool, _ := newTestJig(f, fakeIGs, false)

testCases := []ServicePort{
{Port: 80, Protocol: annotations.ProtocolHTTP},
{Port: 443, Protocol: annotations.ProtocolHTTPS},
{NodePort: 80, Protocol: annotations.ProtocolHTTP},
{NodePort: 443, Protocol: annotations.ProtocolHTTPS},
}

for _, nodePort := range testCases {
for _, sp := range testCases {
// For simplicity, these tests use 80/443 as nodeports
t.Run(fmt.Sprintf("Port:%v Protocol:%v", nodePort.Port, nodePort.Protocol), func(t *testing.T) {
t.Run(fmt.Sprintf("Port:%v Protocol:%v", sp.NodePort, sp.Protocol), func(t *testing.T) {
// Add a backend for a port, then re-add the same port and
// make sure it corrects a broken link from the backend to
// the instance group.
err := pool.Ensure([]ServicePort{nodePort}, nil)
err := pool.Ensure([]ServicePort{sp}, nil)
if err != nil {
t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", nodePort, err)
t.Fatalf("Did not find expect error when adding a nodeport: %v, err: %v", sp, err)
}
beName := defaultNamer.Backend(nodePort.Port)
beName := defaultNamer.Backend(sp.NodePort)

// Check that the new backend has the right port
be, err := f.GetGlobalBackendService(beName)
if err != nil {
t.Fatalf("Did not find expected backend %v", beName)
}
if be.Port != nodePort.Port {
t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, nodePort)
if be.Port != sp.NodePort {
t.Fatalf("Backend %v has wrong port %v, expected %v", be.Name, be.Port, sp)
}

// Check that the instance group has the new port.
ig, err := fakeIGs.GetInstanceGroup(defaultNamer.InstanceGroup(), defaultZone)
var found bool
for _, port := range ig.NamedPorts {
if port.Port == nodePort.Port {
if port.Port == sp.NodePort {
found = true
}
}
if !found {
t.Fatalf("Port %v not added to instance group", nodePort)
t.Fatalf("Port %v not added to instance group", sp)
}

// Check the created healthcheck is the correct protocol
hc, err := pool.healthChecker.Get(nodePort.Port, false)
hc, err := pool.healthChecker.Get(sp.NodePort, false)
if err != nil {
t.Fatalf("Unexpected err when querying fake healthchecker: %v", err)
}

if hc.Protocol() != nodePort.Protocol {
t.Fatalf("Healthcheck scheme does not match nodeport scheme: hc:%v np:%v", hc.Protocol(), nodePort.Protocol)
if hc.Protocol() != sp.Protocol {
t.Fatalf("Healthcheck scheme does not match nodeport scheme: hc:%v np:%v", hc.Protocol(), sp.Protocol)
}

if nodePort.Port == 443 && hc.RequestPath != "/my-special-path" {
if sp.NodePort == 443 && hc.RequestPath != "/my-special-path" {
t.Fatalf("Healthcheck for 443 should have special request path from probe")
}
})
Expand All @@ -135,11 +135,11 @@ func TestHealthCheckMigration(t *testing.T) {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
pool, hcp := newTestJig(f, fakeIGs, false)

p := ServicePort{Port: 7000, Protocol: annotations.ProtocolHTTP}
p := ServicePort{NodePort: 7000, Protocol: annotations.ProtocolHTTP}

// Create a legacy health check and insert it into the HC provider.
legacyHC := &compute.HttpHealthCheck{
Name: defaultNamer.Backend(p.Port),
Name: defaultNamer.Backend(p.NodePort),
RequestPath: "/my-healthz-path",
Host: "k8s.io",
Description: "My custom HC",
Expand All @@ -152,7 +152,7 @@ func TestHealthCheckMigration(t *testing.T) {
pool.Ensure([]ServicePort{p}, nil)

// Assert the proper health check was created
hc, _ := pool.healthChecker.Get(p.Port, false)
hc, _ := pool.healthChecker.Get(p.NodePort, false)
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
Expand All @@ -172,9 +172,9 @@ func TestBackendPoolUpdate(t *testing.T) {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
pool, _ := newTestJig(f, fakeIGs, false)

p := ServicePort{Port: 3000, Protocol: annotations.ProtocolHTTP}
p := ServicePort{NodePort: 3000, Protocol: annotations.ProtocolHTTP}
pool.Ensure([]ServicePort{p}, nil)
beName := defaultNamer.Backend(p.Port)
beName := defaultNamer.Backend(p.NodePort)

be, err := f.GetGlobalBackendService(beName)
if err != nil {
Expand All @@ -186,7 +186,7 @@ func TestBackendPoolUpdate(t *testing.T) {
}

// Assert the proper health check was created
hc, _ := pool.healthChecker.Get(p.Port, false)
hc, _ := pool.healthChecker.Get(p.NodePort, false)
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
Expand All @@ -206,7 +206,7 @@ func TestBackendPoolUpdate(t *testing.T) {
}

// Assert the proper health check was created
hc, _ = pool.healthChecker.Get(p.Port, false)
hc, _ = pool.healthChecker.Get(p.NodePort, false)
if hc == nil || hc.Protocol() != p.Protocol {
t.Fatalf("Expected %s health check, received %v: ", p.Protocol, hc)
}
Expand All @@ -217,9 +217,9 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
pool, _ := newTestJig(f, fakeIGs, false)

nodePort := ServicePort{Port: 8080, Protocol: annotations.ProtocolHTTP}
pool.Ensure([]ServicePort{nodePort}, nil)
beName := defaultNamer.Backend(nodePort.Port)
sp := ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP}
pool.Ensure([]ServicePort{sp}, nil)
beName := defaultNamer.Backend(sp.NodePort)

be, _ := f.GetGlobalBackendService(beName)

Expand All @@ -231,7 +231,7 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
f.calls = []int{}
f.UpdateGlobalBackendService(be)

pool.Ensure([]ServicePort{nodePort}, nil)
pool.Ensure([]ServicePort{sp}, nil)
for _, call := range f.calls {
if call == utils.Create {
t.Fatalf("Unexpected create for existing backend service")
Expand Down Expand Up @@ -260,12 +260,12 @@ func TestBackendPoolChaosMonkey(t *testing.T) {
func TestBackendPoolSync(t *testing.T) {
// Call sync on a backend pool with a list of ports, make sure the pool
// creates/deletes required ports.
svcNodePorts := []ServicePort{{Port: 81, Protocol: annotations.ProtocolHTTP}, {Port: 82, Protocol: annotations.ProtocolHTTPS}, {Port: 83, Protocol: annotations.ProtocolHTTP}}
svcNodePorts := []ServicePort{{NodePort: 81, Protocol: annotations.ProtocolHTTP}, {NodePort: 82, Protocol: annotations.ProtocolHTTPS}, {NodePort: 83, Protocol: annotations.ProtocolHTTP}}
f := NewFakeBackendServices(noOpErrFunc)
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
pool, _ := newTestJig(f, fakeIGs, true)
pool.Ensure([]ServicePort{{Port: 81}}, nil)
pool.Ensure([]ServicePort{{Port: 90}}, nil)
pool.Ensure([]ServicePort{{NodePort: 81}}, nil)
pool.Ensure([]ServicePort{{NodePort: 90}}, nil)
if err := pool.Ensure(svcNodePorts, nil); err != nil {
t.Errorf("Expected backend pool to add node ports, err: %v", err)
}
Expand All @@ -276,19 +276,19 @@ func TestBackendPoolSync(t *testing.T) {
t.Fatalf("Did not expect to find port 90")
}
for _, port := range svcNodePorts {
if _, err := pool.Get(port.Port); err != nil {
if _, err := pool.Get(port.NodePort); err != nil {
t.Fatalf("Expected to find port %v", port)
}
}

svcNodePorts = []ServicePort{{Port: 81}}
deletedPorts := []ServicePort{{Port: 82}, {Port: 83}}
svcNodePorts = []ServicePort{{NodePort: 81}}
deletedPorts := []ServicePort{{NodePort: 82}, {NodePort: 83}}
if err := pool.GC(svcNodePorts); err != nil {
t.Fatalf("Expected backend pool to GC, err: %v", err)
}

for _, port := range deletedPorts {
if _, err := pool.Get(port.Port); err == nil {
if _, err := pool.Get(port.NodePort); err == nil {
t.Fatalf("Pool contains %v after deletion", port)
}
}
Expand All @@ -304,7 +304,7 @@ func TestBackendPoolSync(t *testing.T) {
}

// This backend should get deleted again since it is managed by this cluster.
f.CreateGlobalBackendService(&compute.BackendService{Name: defaultNamer.Backend(deletedPorts[0].Port)})
f.CreateGlobalBackendService(&compute.BackendService{Name: defaultNamer.Backend(deletedPorts[0].NodePort)})

// TODO: Avoid casting.
// Repopulate the pool with a cloud list, which now includes the 82 port
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) {
})

// Have pool sync the above backend service
bp.Ensure([]ServicePort{{Port: 80, Protocol: annotations.ProtocolHTTPS}}, nil)
bp.Ensure([]ServicePort{{NodePort: 80, Protocol: annotations.ProtocolHTTPS}}, nil)

// Verify the legacy health check has been deleted
_, err = hcp.GetHttpHealthCheck(beName)
Expand All @@ -390,7 +390,7 @@ func TestBackendPoolShutdown(t *testing.T) {
pool, _ := newTestJig(f, fakeIGs, false)

// Add a backend-service and verify that it doesn't exist after Shutdown()
pool.Ensure([]ServicePort{{Port: 80}}, nil)
pool.Ensure([]ServicePort{{NodePort: 80}}, nil)
pool.Shutdown()
if _, err := f.GetGlobalBackendService(defaultNamer.Backend(80)); err == nil {
t.Fatalf("%v", err)
Expand All @@ -403,7 +403,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) {
pool, _ := newTestJig(f, fakeIGs, false)

// This will add the instance group k8s-ig to the instance pool
pool.Ensure([]ServicePort{{Port: 80}}, nil)
pool.Ensure([]ServicePort{{NodePort: 80}}, nil)

be, err := f.GetGlobalBackendService(defaultNamer.Backend(80))
if err != nil {
Expand All @@ -421,7 +421,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) {
}

// Make sure repeated adds don't clobber the inserted instance group
pool.Ensure([]ServicePort{{Port: 80}}, nil)
pool.Ensure([]ServicePort{{NodePort: 80}}, nil)
be, err = f.GetGlobalBackendService(defaultNamer.Backend(80))
if err != nil {
t.Fatalf("%v", err)
Expand All @@ -445,7 +445,7 @@ func TestBackendCreateBalancingMode(t *testing.T) {
f := NewFakeBackendServices(noOpErrFunc)
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
pool, _ := newTestJig(f, fakeIGs, false)
nodePort := ServicePort{Port: 8080}
sp := ServicePort{NodePort: 8080}
modes := []BalancingMode{Rate, Utilization}

// block the creation of Backends with the given balancingMode
Expand All @@ -461,8 +461,8 @@ func TestBackendCreateBalancingMode(t *testing.T) {
return nil
}

pool.Ensure([]ServicePort{nodePort}, nil)
be, err := f.GetGlobalBackendService(defaultNamer.Backend(nodePort.Port))
pool.Ensure([]ServicePort{sp}, nil)
be, err := f.GetGlobalBackendService(defaultNamer.Backend(sp.NodePort))
if err != nil {
t.Fatalf("%v", err)
}
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestLinkBackendServiceToNEG(t *testing.T) {
bp := NewBackendPool(f, fakeNEG, healthChecks, nodePool, defaultNamer, []int64{}, false)

svcPort := ServicePort{
Port: 30001,
NodePort: 30001,
Protocol: annotations.ProtocolHTTP,
SvcName: types.NamespacedName{
Namespace: namespace,
Expand All @@ -542,7 +542,7 @@ func TestLinkBackendServiceToNEG(t *testing.T) {
t.Fatalf("Failed to link backend service to NEG: %v", err)
}

bs, err := f.GetGlobalBackendService(defaultNamer.Backend(svcPort.Port))
bs, err := f.GetGlobalBackendService(defaultNamer.Backend(svcPort.NodePort))
if err != nil {
t.Fatalf("Failed to retrieve backend service: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeName
func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
ports := []int64{}
for _, p := range servicePorts {
ports = append(ports, p.Port)
ports = append(ports, p.NodePort)
}
igs, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports)
return igs, err
Expand Down Expand Up @@ -221,7 +221,7 @@ func NewClusterManager(
cluster.healthCheckers = []healthchecks.HealthChecker{healthChecker, defaultBackendHealthChecker}

// TODO: This needs to change to a consolidated management of the default backend.
cluster.backendPool = backends.NewBackendPool(cloud, cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort.Port}, true)
cluster.backendPool = backends.NewBackendPool(cloud, cloud, healthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{defaultBackendNodePort.NodePort}, true)
defaultBackendPool := backends.NewBackendPool(cloud, cloud, defaultBackendHealthChecker, cluster.instancePool, cluster.ClusterNamer, []int64{}, false)
cluster.defaultBackendNodePort = defaultBackendNodePort

Expand Down
Loading

0 comments on commit 19fb049

Please sign in to comment.