Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert removal of support for TCP and UDP services #3374

Merged
merged 4 commits into from
Nov 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cmd/nginx/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ Takes the form "namespace/name". When used together with update-status, the
controller mirrors the address of this service's endpoints to the load-balancer
status of all Ingress objects it satisfies.`)

tcpConfigMapName = flags.String("tcp-services-configmap", "",
`Name of the ConfigMap containing the definition of the TCP services to expose.
The key in the map indicates the external port to be used. The value is a
reference to a Service in the form "namespace/name:port", where "port" can
either be a port number or name. TCP ports 80 and 443 are reserved by the
controller for servicing HTTP traffic.`)
udpConfigMapName = flags.String("udp-services-configmap", "",
`Name of the ConfigMap containing the definition of the UDP services to expose.
The key in the map indicates the external port to be used. The value is a
reference to a Service in the form "namespace/name:port", where "port" can
either be a port name or number.`)

resyncPeriod = flags.Duration("sync-period", 0,
`Period at which the controller forces the repopulation of its local object stores. Disabled by default.`)

Expand Down Expand Up @@ -217,6 +229,8 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en
DefaultService: *defaultSvc,
Namespace: *watchNamespace,
ConfigMapName: *configMap,
TCPConfigMapName: *tcpConfigMapName,
UDPConfigMapName: *udpConfigMapName,
DefaultSSLCertificate: *defSSLCertificate,
DefaultHealthzURL: *defHealthzURL,
HealthCheckTimeout: *healthCheckTimeout,
Expand Down
17 changes: 17 additions & 0 deletions deploy/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,21 @@ metadata:
app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
name: tcp-services
namespace: ingress-nginx
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/part-of: ingress-nginx

---
kind: ConfigMap
apiVersion: v1
metadata:
name: udp-services
namespace: ingress-nginx
labels:
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/part-of: ingress-nginx
6 changes: 3 additions & 3 deletions deploy/mandatory.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ metadata:
name: ingress-nginx

---

kind: ConfigMap
apiVersion: v1
metadata:
Expand All @@ -15,7 +14,6 @@ metadata:
app.kubernetes.io/part-of: ingress-nginx

---

apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down Expand Up @@ -162,7 +160,6 @@ subjects:
namespace: ingress-nginx

---

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
Expand Down Expand Up @@ -193,6 +190,8 @@ spec:
args:
- /nginx-ingress-controller
- --configmap=$(POD_NAMESPACE)/nginx-configuration
- --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
- --udp-services-configmap=$(POD_NAMESPACE)/udp-services
- --publish-service=$(POD_NAMESPACE)/ingress-nginx
- --annotations-prefix=nginx.ingress.kubernetes.io
securityContext:
Expand Down Expand Up @@ -238,3 +237,4 @@ spec:
timeoutSeconds: 1

---

2 changes: 2 additions & 0 deletions deploy/with-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ spec:
args:
- /nginx-ingress-controller
- --configmap=$(POD_NAMESPACE)/nginx-configuration
- --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services
- --udp-services-configmap=$(POD_NAMESPACE)/udp-services
- --publish-service=$(POD_NAMESPACE)/ingress-nginx
- --annotations-prefix=nginx.ingress.kubernetes.io
securityContext:
Expand Down
2 changes: 2 additions & 0 deletions internal/ingress/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,8 @@ type TemplateConfig struct {
Backends []*ingress.Backend
PassthroughBackends []*ingress.SSLPassthroughBackend
Servers []*ingress.Server
TCPBackends []ingress.L4Service
UDPBackends []ingress.L4Service
HealthzURI string
CustomErrors bool
Cfg Configuration
Expand Down
122 changes: 122 additions & 0 deletions internal/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math/rand"
"sort"
"strconv"
"strings"
"time"

"github.com/golang/glog"
Expand Down Expand Up @@ -60,6 +61,11 @@ type Configuration struct {

ForceNamespaceIsolation bool

// +optional
TCPConfigMapName string
// +optional
UDPConfigMapName string

DefaultHealthzURL string
HealthCheckTimeout time.Duration
DefaultSSLCertificate string
Expand Down Expand Up @@ -151,6 +157,8 @@ func (n *NGINXController) syncIngress(interface{}) error {
pcfg := &ingress.Configuration{
Backends: upstreams,
Servers: servers,
TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP),
UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP),
PassthroughBackends: passUpstreams,
BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum,
}
Expand Down Expand Up @@ -216,6 +224,120 @@ func (n *NGINXController) syncIngress(interface{}) error {
return nil
}

func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service {
if configmapName == "" {
return []ingress.L4Service{}
}
glog.V(3).Infof("Obtaining information about %v stream services from ConfigMap %q", proto, configmapName)
_, _, err := k8s.ParseNameNS(configmapName)
if err != nil {
glog.Errorf("Error parsing ConfigMap reference %q: %v", configmapName, err)
return []ingress.L4Service{}
}
configmap, err := n.store.GetConfigMap(configmapName)
if err != nil {
glog.Errorf("Error getting ConfigMap %q: %v", configmapName, err)
return []ingress.L4Service{}
}
var svcs []ingress.L4Service
var svcProxyProtocol ingress.ProxyProtocol
rp := []int{
n.cfg.ListenPorts.HTTP,
n.cfg.ListenPorts.HTTPS,
n.cfg.ListenPorts.SSLProxy,
n.cfg.ListenPorts.Status,
n.cfg.ListenPorts.Health,
n.cfg.ListenPorts.Default,
}
reserverdPorts := sets.NewInt(rp...)
// svcRef format: <(str)namespace>/<(str)service>:<(intstr)port>[:<("PROXY")decode>:<("PROXY")encode>]
for port, svcRef := range configmap.Data {
externalPort, err := strconv.Atoi(port)
if err != nil {
glog.Warningf("%q is not a valid %v port number", port, proto)
continue
}
if reserverdPorts.Has(externalPort) {
glog.Warningf("Port %d cannot be used for %v stream services. It is reserved for the Ingress controller.", externalPort, proto)
continue
}
nsSvcPort := strings.Split(svcRef, ":")
if len(nsSvcPort) < 2 {
glog.Warningf("Invalid Service reference %q for %v port %d", svcRef, proto, externalPort)
continue
}
nsName := nsSvcPort[0]
svcPort := nsSvcPort[1]
svcProxyProtocol.Decode = false
svcProxyProtocol.Encode = false
// Proxy Protocol is only compatible with TCP Services
if len(nsSvcPort) >= 3 && proto == apiv1.ProtocolTCP {
if len(nsSvcPort) >= 3 && strings.ToUpper(nsSvcPort[2]) == "PROXY" {
svcProxyProtocol.Decode = true
}
if len(nsSvcPort) == 4 && strings.ToUpper(nsSvcPort[3]) == "PROXY" {
svcProxyProtocol.Encode = true
}
}
svcNs, svcName, err := k8s.ParseNameNS(nsName)
if err != nil {
glog.Warningf("%v", err)
continue
}
svc, err := n.store.GetService(nsName)
if err != nil {
glog.Warningf("Error getting Service %q: %v", nsName, err)
continue
}
var endps []ingress.Endpoint
targetPort, err := strconv.Atoi(svcPort)
if err != nil {
// not a port number, fall back to using port name
glog.V(3).Infof("Searching Endpoints with %v port name %q for Service %q", proto, svcPort, nsName)
for _, sp := range svc.Spec.Ports {
if sp.Name == svcPort {
if sp.Protocol == proto {
endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints)
break
}
}
}
} else {
glog.V(3).Infof("Searching Endpoints with %v port number %d for Service %q", proto, targetPort, nsName)
for _, sp := range svc.Spec.Ports {
if sp.Port == int32(targetPort) {
if sp.Protocol == proto {
endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints)
break
}
}
}
}
// stream services cannot contain empty upstreams and there is
// no default backend equivalent
if len(endps) == 0 {
glog.Warningf("Service %q does not have any active Endpoint for %v port %v", nsName, proto, svcPort)
continue
}
svcs = append(svcs, ingress.L4Service{
Port: externalPort,
Backend: ingress.L4Backend{
Name: svcName,
Namespace: svcNs,
Port: intstr.FromString(svcPort),
Protocol: proto,
ProxyProtocol: svcProxyProtocol,
},
Endpoints: endps,
})
}
// Keep upstream order sorted to reduce unnecessary nginx config reloads.
sort.SliceStable(svcs, func(i, j int) bool {
return svcs[i].Port < svcs[j].Port
})
return svcs
}

// getDefaultUpstream returns the upstream associated with the default backend.
// Configures the upstream to return HTTP code 503 in case of error.
func (n *NGINXController) getDefaultUpstream() *ingress.Backend {
Expand Down
56 changes: 54 additions & 2 deletions internal/ingress/controller/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
proxyproto "github.com/armon/go-proxyproto"
"github.com/eapache/channels"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -63,7 +64,8 @@ import (
)

const (
ngxHealthPath = "/healthz"
ngxHealthPath = "/healthz"
nginxStreamSocket = "/tmp/ingress-stream.sock"
)

var (
Expand Down Expand Up @@ -112,6 +114,8 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File
config.EnableSSLChainCompletion,
config.Namespace,
config.ConfigMapName,
config.TCPConfigMapName,
config.UDPConfigMapName,
config.DefaultSSLCertificate,
config.ResyncPeriod,
config.Client,
Expand Down Expand Up @@ -578,6 +582,8 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
Backends: ingressCfg.Backends,
PassthroughBackends: ingressCfg.PassthroughBackends,
Servers: ingressCfg.Servers,
TCPBackends: ingressCfg.TCPEndpoints,
UDPBackends: ingressCfg.UDPEndpoints,
HealthzURI: ngxHealthPath,
CustomErrors: len(cfg.CustomHTTPErrors) > 0,
Cfg: cfg,
Expand Down Expand Up @@ -789,6 +795,29 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
return err
}

streams := make([]ingress.Backend, 0)
for _, ep := range pcfg.TCPEndpoints {
key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
})
}
for _, ep := range pcfg.UDPEndpoints {
key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String())
streams = append(streams, ingress.Backend{
Name: key,
Endpoints: ep.Endpoints,
Port: intstr.FromInt(ep.Port),
})
}

err = updateStreamConfiguration(streams)
if err != nil {
return err
}

if isDynamicCertificatesEnabled {
err = configureCertificates(pcfg, port)
if err != nil {
Expand All @@ -799,6 +828,30 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif
return nil
}

func updateStreamConfiguration(streams []ingress.Backend) error {
conn, err := net.Dial("unix", nginxStreamSocket)
if err != nil {
return err
}
defer conn.Close()

buf, err := json.Marshal(streams)
if err != nil {
return err
}

_, err = conn.Write(buf)
if err != nil {
return err
}
_, err = fmt.Fprintf(conn, "\r\n")
if err != nil {
return err
}

return nil
}

// configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint
// that is handled by Lua
func configureCertificates(pcfg *ingress.Configuration, port int) error {
Expand All @@ -824,7 +877,6 @@ func post(url string, data interface{}) error {
}

glog.V(2).Infof("Posting to %s", url)

resp, err := http.Post(url, "application/json", bytes.NewReader(buf))
if err != nil {
return err
Expand Down
Loading