diff --git a/cmd/nginx/flags.go b/cmd/nginx/flags.go index 25633ef54b..8afa6bc169 100644 --- a/cmd/nginx/flags.go +++ b/cmd/nginx/flags.go @@ -64,6 +64,18 @@ Takes the form "namespace/name". When used together with update-status, the controller mirrors the address of this service's endpoints to the load-balancer status of all Ingress objects it satisfies.`) + tcpConfigMapName = flags.String("tcp-services-configmap", "", + `Name of the ConfigMap containing the definition of the TCP services to expose. +The key in the map indicates the external port to be used. The value is a +reference to a Service in the form "namespace/name:port", where "port" can +either be a port number or name. TCP ports 80 and 443 are reserved by the +controller for servicing HTTP traffic.`) + udpConfigMapName = flags.String("udp-services-configmap", "", + `Name of the ConfigMap containing the definition of the UDP services to expose. +The key in the map indicates the external port to be used. The value is a +reference to a Service in the form "namespace/name:port", where "port" can +either be a port name or number.`) + resyncPeriod = flags.Duration("sync-period", 0, `Period at which the controller forces the repopulation of its local object stores. Disabled by default.`) @@ -217,6 +229,8 @@ Feature backed by OpenResty Lua libraries. Requires that OCSP stapling is not en DefaultService: *defaultSvc, Namespace: *watchNamespace, ConfigMapName: *configMap, + TCPConfigMapName: *tcpConfigMapName, + UDPConfigMapName: *udpConfigMapName, DefaultSSLCertificate: *defSSLCertificate, DefaultHealthzURL: *defHealthzURL, HealthCheckTimeout: *healthCheckTimeout, diff --git a/deploy/configmap.yaml b/deploy/configmap.yaml index 6e882c664f..99eeb3c19b 100644 --- a/deploy/configmap.yaml +++ b/deploy/configmap.yaml @@ -8,4 +8,21 @@ metadata: app.kubernetes.io/part-of: ingress-nginx --- +kind: ConfigMap +apiVersion: v1 +metadata: + name: tcp-services + namespace: ingress-nginx + labels: + app.kubernetes.io/name: ingress-nginx + app.kubernetes.io/part-of: ingress-nginx +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: udp-services + namespace: ingress-nginx + labels: + app.kubernetes.io/name: ingress-nginx + app.kubernetes.io/part-of: ingress-nginx diff --git a/deploy/mandatory.yaml b/deploy/mandatory.yaml index f8511d2cdd..f7a94574e1 100644 --- a/deploy/mandatory.yaml +++ b/deploy/mandatory.yaml @@ -4,7 +4,6 @@ metadata: name: ingress-nginx --- - kind: ConfigMap apiVersion: v1 metadata: @@ -15,7 +14,6 @@ metadata: app.kubernetes.io/part-of: ingress-nginx --- - apiVersion: v1 kind: ServiceAccount metadata: @@ -162,7 +160,6 @@ subjects: namespace: ingress-nginx --- - apiVersion: extensions/v1beta1 kind: Deployment metadata: @@ -193,6 +190,8 @@ spec: args: - /nginx-ingress-controller - --configmap=$(POD_NAMESPACE)/nginx-configuration + - --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services + - --udp-services-configmap=$(POD_NAMESPACE)/udp-services - --publish-service=$(POD_NAMESPACE)/ingress-nginx - --annotations-prefix=nginx.ingress.kubernetes.io securityContext: @@ -238,3 +237,4 @@ spec: timeoutSeconds: 1 --- + diff --git a/deploy/with-rbac.yaml b/deploy/with-rbac.yaml index a9fda77108..636f178bc5 100644 --- a/deploy/with-rbac.yaml +++ b/deploy/with-rbac.yaml @@ -28,6 +28,8 @@ spec: args: - /nginx-ingress-controller - --configmap=$(POD_NAMESPACE)/nginx-configuration + - --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services + - --udp-services-configmap=$(POD_NAMESPACE)/udp-services - --publish-service=$(POD_NAMESPACE)/ingress-nginx - --annotations-prefix=nginx.ingress.kubernetes.io securityContext: diff --git a/internal/ingress/controller/config/config.go b/internal/ingress/controller/config/config.go index 60444e5f31..4f87519c62 100644 --- a/internal/ingress/controller/config/config.go +++ b/internal/ingress/controller/config/config.go @@ -709,6 +709,8 @@ type TemplateConfig struct { Backends []*ingress.Backend PassthroughBackends []*ingress.SSLPassthroughBackend Servers []*ingress.Server + TCPBackends []ingress.L4Service + UDPBackends []ingress.L4Service HealthzURI string CustomErrors bool Cfg Configuration diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 62143ac559..591bdfff14 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -21,6 +21,7 @@ import ( "math/rand" "sort" "strconv" + "strings" "time" "github.com/golang/glog" @@ -60,6 +61,11 @@ type Configuration struct { ForceNamespaceIsolation bool + // +optional + TCPConfigMapName string + // +optional + UDPConfigMapName string + DefaultHealthzURL string HealthCheckTimeout time.Duration DefaultSSLCertificate string @@ -151,6 +157,8 @@ func (n *NGINXController) syncIngress(interface{}) error { pcfg := &ingress.Configuration{ Backends: upstreams, Servers: servers, + TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP), + UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP), PassthroughBackends: passUpstreams, BackendConfigChecksum: n.store.GetBackendConfiguration().Checksum, } @@ -216,6 +224,120 @@ func (n *NGINXController) syncIngress(interface{}) error { return nil } +func (n *NGINXController) getStreamServices(configmapName string, proto apiv1.Protocol) []ingress.L4Service { + if configmapName == "" { + return []ingress.L4Service{} + } + glog.V(3).Infof("Obtaining information about %v stream services from ConfigMap %q", proto, configmapName) + _, _, err := k8s.ParseNameNS(configmapName) + if err != nil { + glog.Errorf("Error parsing ConfigMap reference %q: %v", configmapName, err) + return []ingress.L4Service{} + } + configmap, err := n.store.GetConfigMap(configmapName) + if err != nil { + glog.Errorf("Error getting ConfigMap %q: %v", configmapName, err) + return []ingress.L4Service{} + } + var svcs []ingress.L4Service + var svcProxyProtocol ingress.ProxyProtocol + rp := []int{ + n.cfg.ListenPorts.HTTP, + n.cfg.ListenPorts.HTTPS, + n.cfg.ListenPorts.SSLProxy, + n.cfg.ListenPorts.Status, + n.cfg.ListenPorts.Health, + n.cfg.ListenPorts.Default, + } + reserverdPorts := sets.NewInt(rp...) + // svcRef format: <(str)namespace>/<(str)service>:<(intstr)port>[:<("PROXY")decode>:<("PROXY")encode>] + for port, svcRef := range configmap.Data { + externalPort, err := strconv.Atoi(port) + if err != nil { + glog.Warningf("%q is not a valid %v port number", port, proto) + continue + } + if reserverdPorts.Has(externalPort) { + glog.Warningf("Port %d cannot be used for %v stream services. It is reserved for the Ingress controller.", externalPort, proto) + continue + } + nsSvcPort := strings.Split(svcRef, ":") + if len(nsSvcPort) < 2 { + glog.Warningf("Invalid Service reference %q for %v port %d", svcRef, proto, externalPort) + continue + } + nsName := nsSvcPort[0] + svcPort := nsSvcPort[1] + svcProxyProtocol.Decode = false + svcProxyProtocol.Encode = false + // Proxy Protocol is only compatible with TCP Services + if len(nsSvcPort) >= 3 && proto == apiv1.ProtocolTCP { + if len(nsSvcPort) >= 3 && strings.ToUpper(nsSvcPort[2]) == "PROXY" { + svcProxyProtocol.Decode = true + } + if len(nsSvcPort) == 4 && strings.ToUpper(nsSvcPort[3]) == "PROXY" { + svcProxyProtocol.Encode = true + } + } + svcNs, svcName, err := k8s.ParseNameNS(nsName) + if err != nil { + glog.Warningf("%v", err) + continue + } + svc, err := n.store.GetService(nsName) + if err != nil { + glog.Warningf("Error getting Service %q: %v", nsName, err) + continue + } + var endps []ingress.Endpoint + targetPort, err := strconv.Atoi(svcPort) + if err != nil { + // not a port number, fall back to using port name + glog.V(3).Infof("Searching Endpoints with %v port name %q for Service %q", proto, svcPort, nsName) + for _, sp := range svc.Spec.Ports { + if sp.Name == svcPort { + if sp.Protocol == proto { + endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints) + break + } + } + } + } else { + glog.V(3).Infof("Searching Endpoints with %v port number %d for Service %q", proto, targetPort, nsName) + for _, sp := range svc.Spec.Ports { + if sp.Port == int32(targetPort) { + if sp.Protocol == proto { + endps = getEndpoints(svc, &sp, proto, n.store.GetServiceEndpoints) + break + } + } + } + } + // stream services cannot contain empty upstreams and there is + // no default backend equivalent + if len(endps) == 0 { + glog.Warningf("Service %q does not have any active Endpoint for %v port %v", nsName, proto, svcPort) + continue + } + svcs = append(svcs, ingress.L4Service{ + Port: externalPort, + Backend: ingress.L4Backend{ + Name: svcName, + Namespace: svcNs, + Port: intstr.FromString(svcPort), + Protocol: proto, + ProxyProtocol: svcProxyProtocol, + }, + Endpoints: endps, + }) + } + // Keep upstream order sorted to reduce unnecessary nginx config reloads. + sort.SliceStable(svcs, func(i, j int) bool { + return svcs[i].Port < svcs[j].Port + }) + return svcs +} + // getDefaultUpstream returns the upstream associated with the default backend. // Configures the upstream to return HTTP code 503 in case of error. func (n *NGINXController) getDefaultUpstream() *ingress.Backend { diff --git a/internal/ingress/controller/nginx.go b/internal/ingress/controller/nginx.go index eff20a2d3f..391c4eea63 100644 --- a/internal/ingress/controller/nginx.go +++ b/internal/ingress/controller/nginx.go @@ -39,6 +39,7 @@ import ( proxyproto "github.com/armon/go-proxyproto" "github.com/eapache/channels" apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -63,7 +64,8 @@ import ( ) const ( - ngxHealthPath = "/healthz" + ngxHealthPath = "/healthz" + nginxStreamSocket = "/tmp/ingress-stream.sock" ) var ( @@ -112,6 +114,8 @@ func NewNGINXController(config *Configuration, mc metric.Collector, fs file.File config.EnableSSLChainCompletion, config.Namespace, config.ConfigMapName, + config.TCPConfigMapName, + config.UDPConfigMapName, config.DefaultSSLCertificate, config.ResyncPeriod, config.Client, @@ -578,6 +582,8 @@ func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error { Backends: ingressCfg.Backends, PassthroughBackends: ingressCfg.PassthroughBackends, Servers: ingressCfg.Servers, + TCPBackends: ingressCfg.TCPEndpoints, + UDPBackends: ingressCfg.UDPEndpoints, HealthzURI: ngxHealthPath, CustomErrors: len(cfg.CustomHTTPErrors) > 0, Cfg: cfg, @@ -789,6 +795,29 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif return err } + streams := make([]ingress.Backend, 0) + for _, ep := range pcfg.TCPEndpoints { + key := fmt.Sprintf("tcp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String()) + streams = append(streams, ingress.Backend{ + Name: key, + Endpoints: ep.Endpoints, + Port: intstr.FromInt(ep.Port), + }) + } + for _, ep := range pcfg.UDPEndpoints { + key := fmt.Sprintf("udp-%v-%v-%v", ep.Backend.Namespace, ep.Backend.Name, ep.Backend.Port.String()) + streams = append(streams, ingress.Backend{ + Name: key, + Endpoints: ep.Endpoints, + Port: intstr.FromInt(ep.Port), + }) + } + + err = updateStreamConfiguration(streams) + if err != nil { + return err + } + if isDynamicCertificatesEnabled { err = configureCertificates(pcfg, port) if err != nil { @@ -799,6 +828,30 @@ func configureDynamically(pcfg *ingress.Configuration, port int, isDynamicCertif return nil } +func updateStreamConfiguration(streams []ingress.Backend) error { + conn, err := net.Dial("unix", nginxStreamSocket) + if err != nil { + return err + } + defer conn.Close() + + buf, err := json.Marshal(streams) + if err != nil { + return err + } + + _, err = conn.Write(buf) + if err != nil { + return err + } + _, err = fmt.Fprintf(conn, "\r\n") + if err != nil { + return err + } + + return nil +} + // configureCertificates JSON encodes certificates and POSTs it to an internal HTTP endpoint // that is handled by Lua func configureCertificates(pcfg *ingress.Configuration, port int) error { @@ -824,7 +877,6 @@ func post(url string, data interface{}) error { } glog.V(2).Infof("Posting to %s", url) - resp, err := http.Post(url, "application/json", bytes.NewReader(buf)) if err != nil { return err diff --git a/internal/ingress/controller/nginx_test.go b/internal/ingress/controller/nginx_test.go index e2a815c855..b18cb1a928 100644 --- a/internal/ingress/controller/nginx_test.go +++ b/internal/ingress/controller/nginx_test.go @@ -24,6 +24,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" jsoniter "github.com/json-iterator/go" apiv1 "k8s.io/api/core/v1" @@ -146,7 +147,33 @@ func TestIsDynamicConfigurationEnough(t *testing.T) { } } +func mockUnixSocket(t *testing.T) net.Listener { + l, err := net.Listen("unix", nginxStreamSocket) + if err != nil { + t.Fatalf("unexpected error creating unix socket: %v", err) + } + if l == nil { + t.Fatalf("expected a listener but none returned") + } + + go func() { + for { + conn, err := l.Accept() + if err != nil { + continue + } + + time.Sleep(100 * time.Millisecond) + defer conn.Close() + } + }() + + return l +} func TestConfigureDynamically(t *testing.T) { + l := mockUnixSocket(t) + defer l.Close() + target := &apiv1.ObjectReference{} backends := []*ingress.Backend{{ diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index 547d5ff44c..751246c01d 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -218,7 +218,7 @@ type k8sStore struct { // New creates a new object store to be used in the ingress controller func New(checkOCSP bool, - namespace, configmap, defaultSSLCertificate string, + namespace, configmap, tcp, udp, defaultSSLCertificate string, resyncPeriod time.Duration, client clientset.Interface, fs file.Filesystem, @@ -473,7 +473,7 @@ func New(checkOCSP bool, cm := obj.(*corev1.ConfigMap) key := k8s.MetaNamespaceKey(cm) // updates to configuration configmaps can trigger an update - if key == configmap { + if key == configmap || key == tcp || key == udp { recorder.Eventf(cm, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("ConfigMap %v", key)) if key == configmap { store.setConfig(cm) @@ -489,7 +489,7 @@ func New(checkOCSP bool, cm := cur.(*corev1.ConfigMap) key := k8s.MetaNamespaceKey(cm) // updates to configuration configmaps can trigger an update - if key == configmap { + if key == configmap || key == tcp || key == udp { recorder.Eventf(cm, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("ConfigMap %v", key)) if key == configmap { store.setConfig(cm) diff --git a/internal/ingress/controller/store/store_test.go b/internal/ingress/controller/store/store_test.go index 860ebe651d..5b6e0738f6 100644 --- a/internal/ingress/controller/store/store_test.go +++ b/internal/ingress/controller/store/store_test.go @@ -63,6 +63,8 @@ func TestStore(t *testing.T) { storer := New(true, ns, fmt.Sprintf("%v/config", ns), + fmt.Sprintf("%v/tcp", ns), + fmt.Sprintf("%v/udp", ns), "", 10*time.Minute, clientSet, @@ -149,6 +151,8 @@ func TestStore(t *testing.T) { storer := New(true, ns, fmt.Sprintf("%v/config", ns), + fmt.Sprintf("%v/tcp", ns), + fmt.Sprintf("%v/udp", ns), "", 10*time.Minute, clientSet, @@ -295,6 +299,8 @@ func TestStore(t *testing.T) { storer := New(true, ns, fmt.Sprintf("%v/config", ns), + fmt.Sprintf("%v/tcp", ns), + fmt.Sprintf("%v/udp", ns), "", 10*time.Minute, clientSet, @@ -382,6 +388,8 @@ func TestStore(t *testing.T) { storer := New(true, ns, fmt.Sprintf("%v/config", ns), + fmt.Sprintf("%v/tcp", ns), + fmt.Sprintf("%v/udp", ns), "", 10*time.Minute, clientSet, @@ -492,6 +500,8 @@ func TestStore(t *testing.T) { storer := New(true, ns, fmt.Sprintf("%v/config", ns), + fmt.Sprintf("%v/tcp", ns), + fmt.Sprintf("%v/udp", ns), "", 10*time.Minute, clientSet, diff --git a/internal/ingress/types.go b/internal/ingress/types.go index 77a18cc94b..96835900a9 100644 --- a/internal/ingress/types.go +++ b/internal/ingress/types.go @@ -54,6 +54,12 @@ type Configuration struct { Backends []*Backend `json:"backends,omitempty"` // Servers save the website config Servers []*Server `json:"servers,omitempty"` + // TCPEndpoints contain endpoints for tcp streams handled by this backend + // +optional + TCPEndpoints []L4Service `json:"tcpEndpoints,omitempty"` + // UDPEndpoints contain endpoints for udp streams handled by this backend + // +optional + UDPEndpoints []L4Service `json:"udpEndpoints,omitempty"` // PassthroughBackends contains the backends used for SSL passthrough. // It contains information about the associated Server Name Indication (SNI). // +optional diff --git a/internal/ingress/types_equals.go b/internal/ingress/types_equals.go index f5db890eeb..5b1808c925 100644 --- a/internal/ingress/types_equals.go +++ b/internal/ingress/types_equals.go @@ -53,6 +53,41 @@ func (c1 *Configuration) Equal(c2 *Configuration) bool { } } + if len(c1.TCPEndpoints) != len(c2.TCPEndpoints) { + return false + } + for _, tcp1 := range c1.TCPEndpoints { + found := false + for _, tcp2 := range c2.TCPEndpoints { + if (&tcp1).Equal(&tcp2) { + found = true + break + } + } + if !found { + return false + } + } + + if len(c1.UDPEndpoints) != len(c2.UDPEndpoints) { + return false + } + for _, udp1 := range c1.UDPEndpoints { + found := false + for _, udp2 := range c2.UDPEndpoints { + if (&udp1).Equal(&udp2) { + found = true + break + } + } + if !found { + return false + } + } + + if len(c1.PassthroughBackends) != len(c2.PassthroughBackends) { + return false + } for _, ptb1 := range c1.PassthroughBackends { found := false for _, ptb2 := range c2.PassthroughBackends { diff --git a/rootfs/etc/nginx/lua/configuration.lua b/rootfs/etc/nginx/lua/configuration.lua index 1898796543..9dee8042f3 100644 --- a/rootfs/etc/nginx/lua/configuration.lua +++ b/rootfs/etc/nginx/lua/configuration.lua @@ -47,7 +47,7 @@ local function handle_servers() local ok, servers = pcall(json.decode, raw_servers) if not ok then - ngx.log(ngx.ERR, "could not parse servers: " .. tostring(servers)) + ngx.log(ngx.ERR, "could not parse servers: " .. tostring(servers)) ngx.status = ngx.HTTP_BAD_REQUEST return end @@ -63,8 +63,7 @@ local function handle_servers() return end - local err_msg = string.format("error setting certificate for %s: %s\n", - server.hostname, tostring(err)) + local err_msg = string.format("error setting certificate for %s: %s\n", server.hostname, tostring(err)) table.insert(err_buf, err_msg) end else diff --git a/rootfs/etc/nginx/lua/tcp_udp_balancer.lua b/rootfs/etc/nginx/lua/tcp_udp_balancer.lua new file mode 100644 index 0000000000..75d29c5bd7 --- /dev/null +++ b/rootfs/etc/nginx/lua/tcp_udp_balancer.lua @@ -0,0 +1,177 @@ +local ngx_balancer = require("ngx.balancer") +local json = require("cjson") +local util = require("util") +local dns_util = require("util.dns") +local configuration = require("tcp_udp_configuration") +local round_robin = require("balancer.round_robin") + +-- measured in seconds +-- for an Nginx worker to pick up the new list of upstream peers +-- it will take + BACKENDS_SYNC_INTERVAL +local BACKENDS_SYNC_INTERVAL = 1 + +local DEFAULT_LB_ALG = "round_robin" +local IMPLEMENTATIONS = { + round_robin = round_robin +} + +local _M = {} +local balancers = {} + +local function get_implementation(backend) + local name = backend["load-balance"] or DEFAULT_LB_ALG + + local implementation = IMPLEMENTATIONS[name] + if not implementation then + ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", backend["load-balance"], DEFAULT_LB_ALG)) + implementation = IMPLEMENTATIONS[DEFAULT_LB_ALG] + end + + return implementation +end + +local function resolve_external_names(original_backend) + local backend = util.deepcopy(original_backend) + local endpoints = {} + for _, endpoint in ipairs(backend.endpoints) do + local ips = dns_util.resolve(endpoint.address) + for _, ip in ipairs(ips) do + table.insert(endpoints, {address = ip, port = endpoint.port}) + end + end + backend.endpoints = endpoints + return backend +end + +local function format_ipv6_endpoints(endpoints) + local formatted_endpoints = {} + for _, endpoint in ipairs(endpoints) do + local formatted_endpoint = endpoint + if not endpoint.address:match("^%d+.%d+.%d+.%d+$") then + formatted_endpoint.address = string.format("[%s]", endpoint.address) + end + table.insert(formatted_endpoints, formatted_endpoint) + end + return formatted_endpoints +end + +local function sync_backend(backend) + if not backend.endpoints or #backend.endpoints == 0 then + ngx.log(ngx.INFO, string.format("there is no endpoint for backend %s. Skipping...", backend.name)) + return + end + + ngx.log(ngx.INFO, string.format("backend ", backend.name)) + local implementation = get_implementation(backend) + local balancer = balancers[backend.name] + + if not balancer then + balancers[backend.name] = implementation:new(backend) + return + end + + -- every implementation is the metatable of its instances (see .new(...) functions) + -- here we check if `balancer` is the instance of `implementation` + -- if it is not then we deduce LB algorithm has changed for the backend + if getmetatable(balancer) ~= implementation then + ngx.log( + ngx.INFO, + string.format("LB algorithm changed from %s to %s, resetting the instance", balancer.name, implementation.name) + ) + balancers[backend.name] = implementation:new(backend) + return + end + + local service_type = backend.service and backend.service.spec and backend.service.spec["type"] + if service_type == "ExternalName" then + backend = resolve_external_names(backend) + end + + backend.endpoints = format_ipv6_endpoints(backend.endpoints) + + balancer:sync(backend) +end + +local function sync_backends() + local backends_data = configuration.get_backends_data() + if not backends_data then + balancers = {} + return + end + + local ok, new_backends = pcall(json.decode, backends_data) + if not ok then + ngx.log(ngx.ERR, "could not parse backends data: " .. tostring(new_backends)) + return + end + + local balancers_to_keep = {} + for _, new_backend in ipairs(new_backends) do + sync_backend(new_backend) + balancers_to_keep[new_backend.name] = balancers[new_backend.name] + end + + for backend_name, _ in pairs(balancers) do + if not balancers_to_keep[backend_name] then + balancers[backend_name] = nil + end + end +end + +local function get_balancer() + local backend_name = ngx.var.proxy_upstream_name + local balancer = balancers[backend_name] + if not balancer then + return + end + + return balancer +end + +function _M.init_worker() + sync_backends() -- when worker starts, sync backends without delay + local _, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends) + if err then + ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err))) + end +end + +function _M.balance() + local balancer = get_balancer() + if not balancer then + return + end + + local peer = balancer:balance() + if not peer then + ngx.log(ngx.WARN, "no peer was returned, balancer: " .. balancer.name) + return + end + + ngx_balancer.set_more_tries(1) + + local ok, err = ngx_balancer.set_current_peer(peer) + if not ok then + ngx.log(ngx.ERR, string.format("error while setting current upstream peer %s: %s", peer, err)) + end +end + +function _M.log() + local balancer = get_balancer() + if not balancer then + return + end + + if not balancer.after_balance then + return + end + + balancer:after_balance() +end + +if _TEST then + _M.get_implementation = get_implementation + _M.sync_backend = sync_backend +end + +return _M diff --git a/rootfs/etc/nginx/lua/tcp_udp_configuration.lua b/rootfs/etc/nginx/lua/tcp_udp_configuration.lua new file mode 100644 index 0000000000..8cc4111cf3 --- /dev/null +++ b/rootfs/etc/nginx/lua/tcp_udp_configuration.lua @@ -0,0 +1,40 @@ +-- this is the Lua representation of TCP/UDP Configuration +local tcp_udp_configuration_data = ngx.shared.tcp_udp_configuration_data + +local _M = { + nameservers = {} +} + +function _M.get_backends_data() + return tcp_udp_configuration_data:get("backends") +end + +function _M.call() + local sock, err = ngx.req.socket(true) + if not sock then + ngx.log(ngx.ERR, "failed to get raw req socket: ", err) + ngx.say("error: ", err) + return + end + + local reader = sock:receiveuntil("\r\n") + local backends, err_read = reader() + if not backends then + ngx.log(ngx.ERR, "failed TCP/UDP dynamic-configuration:", err_read) + ngx.say("error: ", err_read) + return + end + + if backends == nil or backends == "" then + return + end + + local success, err_conf = tcp_udp_configuration_data:set("backends", backends) + if not success then + ngx.log(ngx.ERR, "dynamic-configuration: error updating configuration: " .. tostring(err_conf)) + ngx.say("error: ", err_conf) + return + end +end + +return _M diff --git a/rootfs/etc/nginx/template/nginx.tmpl b/rootfs/etc/nginx/template/nginx.tmpl index bf7f4bfbf5..5842bff6d0 100644 --- a/rootfs/etc/nginx/template/nginx.tmpl +++ b/rootfs/etc/nginx/template/nginx.tmpl @@ -680,6 +680,40 @@ http { } stream { + lua_package_cpath "/usr/local/lib/lua/?.so;/usr/lib/lua-platform-path/lua/5.1/?.so;;"; + lua_package_path "/etc/nginx/lua/?.lua;/etc/nginx/lua/vendor/?.lua;/usr/local/lib/lua/?.lua;;"; + + lua_shared_dict tcp_udp_configuration_data 5M; + + init_by_lua_block { + require("resty.core") + collectgarbage("collect") + + -- init modules + local ok, res + + ok, res = pcall(require, "tcp_udp_configuration") + if not ok then + error("require failed: " .. tostring(res)) + else + tcp_udp_configuration = res + tcp_udp_configuration.nameservers = { {{ buildResolversForLua $cfg.Resolver $cfg.DisableIpv6DNS }} } + end + + ok, res = pcall(require, "tcp_udp_balancer") + if not ok then + error("require failed: " .. tostring(res)) + else + tcp_udp_balancer = res + end + } + + init_worker_by_lua_block { + tcp_udp_balancer.init_worker() + } + + lua_add_variable $proxy_upstream_name; + log_format log_stream {{ $cfg.LogFormatStream }}; {{ if $cfg.DisableAccessLog }} @@ -689,6 +723,74 @@ stream { {{ end }} error_log {{ $cfg.ErrorLogPath }}; + + upstream upstream_balancer { + server 0.0.0.1:1234; # placeholder + + balancer_by_lua_block { + tcp_udp_balancer.balance() + } + } + + server { + listen unix:/tmp/ingress-stream.sock; + + content_by_lua_block { + tcp_udp_configuration.call() + } + } + + # TCP services + {{ range $tcpServer := .TCPBackends }} + server { + preread_by_lua_block { + ngx.var.proxy_upstream_name="tcp-{{ $tcpServer.Backend.Namespace }}-{{ $tcpServer.Backend.Name }}-{{ $tcpServer.Backend.Port }}"; + } + + {{ range $address := $all.Cfg.BindAddressIpv4 }} + listen {{ $address }}:{{ $tcpServer.Port }}{{ if $tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }}; + {{ else }} + listen {{ $tcpServer.Port }}{{ if $tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }}; + {{ end }} + {{ if $IsIPV6Enabled }} + {{ range $address := $all.Cfg.BindAddressIpv6 }} + listen {{ $address }}:{{ $tcpServer.Port }}{{ if $tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }}; + {{ else }} + listen [::]:{{ $tcpServer.Port }}{{ if $tcpServer.Backend.ProxyProtocol.Decode }} proxy_protocol{{ end }}; + {{ end }} + {{ end }} + proxy_timeout {{ $cfg.ProxyStreamTimeout }}; + proxy_pass upstream_balancer; + {{ if $tcpServer.Backend.ProxyProtocol.Encode }} + proxy_protocol on; + {{ end }} + } + {{ end }} + + # UDP services + {{ range $udpServer := .UDPBackends }} + server { + preread_by_lua_block { + ngx.var.proxy_upstream_name="udp-{{ $udpServer.Backend.Namespace }}-{{ $udpServer.Backend.Name }}-{{ $udpServer.Backend.Port }}"; + } + + {{ range $address := $all.Cfg.BindAddressIpv4 }} + listen {{ $address }}:{{ $udpServer.Port }} udp; + {{ else }} + listen {{ $udpServer.Port }} udp; + {{ end }} + {{ if $IsIPV6Enabled }} + {{ range $address := $all.Cfg.BindAddressIpv6 }} + listen {{ $address }}:{{ $udpServer.Port }} udp; + {{ else }} + listen [::]:{{ $udpServer.Port }} udp; + {{ end }} + {{ end }} + proxy_responses {{ $cfg.ProxyStreamResponses }}; + proxy_timeout {{ $cfg.ProxyStreamTimeout }}; + proxy_pass upstream_balancer; + } + {{ end }} } {{/* definition of templates to avoid repetitions */}} @@ -923,7 +1025,7 @@ stream { waf:set_option("mode", "{{ $location.LuaRestyWAF.Mode }}") waf:set_option("storage_zone", "waf_storage") - {{ if $location.LuaRestyWAF.AllowUnknownContentTypes }} + {{ if $location.LuaRestyWAF.AllowUnknownContentTypes }} waf:set_option("allow_unknown_content_types", true) {{ else }} waf:set_option("allowed_content_types", { "text/html", "text/json", "application/json" }) diff --git a/test/e2e/annotations/affinity.go b/test/e2e/annotations/affinity.go index 6e175b6c3b..0e9a0da403 100644 --- a/test/e2e/annotations/affinity.go +++ b/test/e2e/annotations/affinity.go @@ -68,7 +68,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Affinity/Sticky Sessions", match := md5Regex.FindStringSubmatch(resp.Header.Get("Set-Cookie")) Expect(len(match)).Should(BeNumerically("==", 1)) - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.Header.Get("Set-Cookie")).Should(ContainSubstring(match[0])) }) @@ -97,7 +97,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Affinity/Sticky Sessions", match := sha1Regex.FindStringSubmatch(resp.Header.Get("Set-Cookie")) Expect(len(match)).Should(BeNumerically("==", 1)) - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.Header.Get("Set-Cookie")).Should(ContainSubstring(match[0])) }) @@ -122,7 +122,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Affinity/Sticky Sessions", Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.Header.Get("Set-Cookie")).Should(ContainSubstring("Path=/something")) }) @@ -179,7 +179,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Affinity/Sticky Sessions", Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.Header.Get("Set-Cookie")).Should(ContainSubstring("Path=/something;")) @@ -188,7 +188,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Affinity/Sticky Sessions", Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.Header.Get("Set-Cookie")).Should(ContainSubstring("Path=/somewhereelese;")) }) @@ -215,7 +215,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Affinity/Sticky Sessions", Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) local, _ := time.LoadLocation("GMT") duration, _ := time.ParseDuration("48h") diff --git a/test/e2e/annotations/alias.go b/test/e2e/annotations/alias.go index 2ebda7b5e3..a37026b4be 100644 --- a/test/e2e/annotations/alias.go +++ b/test/e2e/annotations/alias.go @@ -55,7 +55,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Alias", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(body).Should(ContainSubstring(fmt.Sprintf("host=%v", host))) @@ -64,7 +64,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Alias", func() { Set("Host", "bar"). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusNotFound)) Expect(body).Should(ContainSubstring("404 Not Found")) }) @@ -91,7 +91,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Alias", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(body).Should(ContainSubstring(fmt.Sprintf("host=%v", host))) } diff --git a/test/e2e/annotations/approot.go b/test/e2e/annotations/approot.go index 5c6d2942a3..9a325202d2 100644 --- a/test/e2e/annotations/approot.go +++ b/test/e2e/annotations/approot.go @@ -59,7 +59,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Approot", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusFound)) Expect(resp.Header.Get("Location")).Should(Equal("http://approot.bar.com/foo")) }) diff --git a/test/e2e/annotations/auth.go b/test/e2e/annotations/auth.go index 4e1286373c..fdc6a59c41 100644 --- a/test/e2e/annotations/auth.go +++ b/test/e2e/annotations/auth.go @@ -63,7 +63,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Auth", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(body).Should(ContainSubstring(fmt.Sprintf("host=%v", host))) }) @@ -91,7 +91,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Auth", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusServiceUnavailable)) Expect(body).Should(ContainSubstring("503 Service Temporarily Unavailable")) }) @@ -122,7 +122,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Auth", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusUnauthorized)) Expect(body).Should(ContainSubstring("401 Authorization Required")) }) @@ -154,7 +154,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Auth", func() { SetBasicAuth("user", "pass"). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusUnauthorized)) Expect(body).Should(ContainSubstring("401 Authorization Required")) }) @@ -186,7 +186,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Auth", func() { SetBasicAuth("foo", "bar"). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) }) @@ -229,7 +229,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Auth", func() { SetBasicAuth("foo", "bar"). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusInternalServerError)) }) diff --git a/test/e2e/annotations/authtls.go b/test/e2e/annotations/authtls.go index 89f1de4a63..d489f842e1 100644 --- a/test/e2e/annotations/authtls.go +++ b/test/e2e/annotations/authtls.go @@ -81,7 +81,7 @@ var _ = framework.IngressNginxDescribe("Annotations - AuthTLS", func() { TLSClientConfig(&tls.Config{ServerName: host, InsecureSkipVerify: true}). Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusBadRequest)) // Send Request Passing the Client Certs @@ -90,7 +90,7 @@ var _ = framework.IngressNginxDescribe("Annotations - AuthTLS", func() { TLSClientConfig(clientConfig). Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) }) @@ -135,7 +135,7 @@ var _ = framework.IngressNginxDescribe("Annotations - AuthTLS", func() { TLSClientConfig(&tls.Config{ServerName: host, InsecureSkipVerify: true}). Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) }) @@ -191,7 +191,7 @@ var _ = framework.IngressNginxDescribe("Annotations - AuthTLS", func() { Set("Host", host). RedirectPolicy(noRedirectPolicyFunc). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusFound)) Expect(resp.Header.Get("Location")).Should(Equal(f.IngressController.HTTPURL + errorPath)) @@ -201,7 +201,7 @@ var _ = framework.IngressNginxDescribe("Annotations - AuthTLS", func() { TLSClientConfig(clientConfig). Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) }) }) diff --git a/test/e2e/annotations/canary.go b/test/e2e/annotations/canary.go index b2c5186678..6201eadb2f 100644 --- a/test/e2e/annotations/canary.go +++ b/test/e2e/annotations/canary.go @@ -76,7 +76,7 @@ var _ = framework.IngressNginxDescribe("Annotations - canary", func() { Set("CanaryByHeader", "always"). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).ShouldNot(Equal(http.StatusNotFound)) Expect(body).Should(ContainSubstring("http-svc-canary")) }) @@ -113,7 +113,7 @@ var _ = framework.IngressNginxDescribe("Annotations - canary", func() { Set("CanaryByHeader", "never"). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).ShouldNot(Equal(http.StatusNotFound)) Expect(body).ShouldNot(ContainSubstring("http-svc-canary")) }) @@ -152,7 +152,7 @@ var _ = framework.IngressNginxDescribe("Annotations - canary", func() { AddCookie(&http.Cookie{Name: "CanaryByCookie", Value: "always"}). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).ShouldNot(Equal(http.StatusNotFound)) Expect(body).Should(ContainSubstring("http-svc-canary")) }) @@ -191,7 +191,7 @@ var _ = framework.IngressNginxDescribe("Annotations - canary", func() { AddCookie(&http.Cookie{Name: "CanaryByCookie", Value: "never"}). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).ShouldNot(Equal(http.StatusNotFound)) Expect(body).ShouldNot(ContainSubstring("http-svc-canary")) }) diff --git a/test/e2e/annotations/connection.go b/test/e2e/annotations/connection.go index c281a2a69a..6ca77cce78 100644 --- a/test/e2e/annotations/connection.go +++ b/test/e2e/annotations/connection.go @@ -57,7 +57,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Connection", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(body).Should(ContainSubstring(fmt.Sprintf("connection=keep-alive"))) }) diff --git a/test/e2e/annotations/cors.go b/test/e2e/annotations/cors.go index 8da735c61d..4fb4c9d4fc 100644 --- a/test/e2e/annotations/cors.go +++ b/test/e2e/annotations/cors.go @@ -75,7 +75,7 @@ var _ = framework.IngressNginxDescribe("Annotations - CORS", func() { Options(f.IngressController.HTTPURL+uri). Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusNoContent)) }) diff --git a/test/e2e/annotations/default_backend.go b/test/e2e/annotations/default_backend.go index a1d7967f9e..0d105c3862 100644 --- a/test/e2e/annotations/default_backend.go +++ b/test/e2e/annotations/default_backend.go @@ -58,7 +58,7 @@ var _ = framework.IngressNginxDescribe("Annotations - custom default-backend", f Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(body).To(ContainSubstring("x-code=503")) diff --git a/test/e2e/annotations/forcesslredirect.go b/test/e2e/annotations/forcesslredirect.go index 590bf3c311..e46eb6465f 100644 --- a/test/e2e/annotations/forcesslredirect.go +++ b/test/e2e/annotations/forcesslredirect.go @@ -59,7 +59,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Forcesslredirect", func() Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusPermanentRedirect)) Expect(resp.Header.Get("Location")).Should(Equal("https://forcesslredirect.bar.com/")) }) diff --git a/test/e2e/annotations/fromtowwwredirect.go b/test/e2e/annotations/fromtowwwredirect.go index 6d1e597ed0..d5bc376c83 100644 --- a/test/e2e/annotations/fromtowwwredirect.go +++ b/test/e2e/annotations/fromtowwwredirect.go @@ -63,7 +63,7 @@ var _ = framework.IngressNginxDescribe("Annotations - Fromtowwwredirect", func() Set("Host", fmt.Sprintf("%s.%s", "www", host)). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusPermanentRedirect)) Expect(resp.Header.Get("Location")).Should(Equal("http://fromtowwwredirect.bar.com/foo")) }) diff --git a/test/e2e/defaultbackend/default_backend.go b/test/e2e/defaultbackend/default_backend.go index f957bbfe18..3e4b6b87b2 100644 --- a/test/e2e/defaultbackend/default_backend.go +++ b/test/e2e/defaultbackend/default_backend.go @@ -94,7 +94,7 @@ var _ = framework.IngressNginxDescribe("Default backend", func() { } resp, _, errs := cm.End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(test.Status)) } }) diff --git a/test/e2e/defaultbackend/ssl.go b/test/e2e/defaultbackend/ssl.go index 7dda5ce319..80f2be1850 100644 --- a/test/e2e/defaultbackend/ssl.go +++ b/test/e2e/defaultbackend/ssl.go @@ -44,7 +44,7 @@ var _ = framework.IngressNginxDescribe("Default backend - SSL", func() { InsecureSkipVerify: true, }).End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(len(resp.TLS.PeerCertificates)).Should(BeNumerically("==", 1)) for _, pc := range resp.TLS.PeerCertificates { @@ -59,7 +59,7 @@ var _ = framework.IngressNginxDescribe("Default backend - SSL", func() { InsecureSkipVerify: true, }). Set("Host", "foo.bar.com").End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(len(resp.TLS.PeerCertificates)).Should(BeNumerically("==", 1)) for _, pc := range resp.TLS.PeerCertificates { Expect(pc.Issuer.CommonName).Should(Equal("Kubernetes Ingress Controller Fake Certificate")) diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go index ca5f7e8605..ea52f9afa9 100644 --- a/test/e2e/e2e.go +++ b/test/e2e/e2e.go @@ -39,6 +39,7 @@ import ( _ "k8s.io/ingress-nginx/test/e2e/settings" _ "k8s.io/ingress-nginx/test/e2e/ssl" _ "k8s.io/ingress-nginx/test/e2e/status" + _ "k8s.io/ingress-nginx/test/e2e/tcpudp" ) // RunE2ETests checks configuration parameters (specified through flags) and then runs diff --git a/test/e2e/servicebackend/service_backend.go b/test/e2e/servicebackend/service_backend.go index 7e28d03151..3df7e89964 100644 --- a/test/e2e/servicebackend/service_backend.go +++ b/test/e2e/servicebackend/service_backend.go @@ -56,7 +56,7 @@ var _ = framework.IngressNginxDescribe("Service backend - 503", func() { Get(f.IngressController.HTTPURL). Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(503)) }) @@ -79,7 +79,7 @@ var _ = framework.IngressNginxDescribe("Service backend - 503", func() { Get(f.IngressController.HTTPURL). Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(503)) }) diff --git a/test/e2e/settings/forwarded_headers.go b/test/e2e/settings/forwarded_headers.go index fe451655e5..b08ee075ee 100644 --- a/test/e2e/settings/forwarded_headers.go +++ b/test/e2e/settings/forwarded_headers.go @@ -63,7 +63,7 @@ var _ = framework.IngressNginxDescribe("X-Forwarded headers", func() { Set("X-Forwarded-Host", "myhost"). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(body).Should(ContainSubstring(fmt.Sprintf("host=myhost"))) Expect(body).Should(ContainSubstring(fmt.Sprintf("x-forwarded-host=myhost"))) @@ -92,7 +92,7 @@ var _ = framework.IngressNginxDescribe("X-Forwarded headers", func() { Set("X-Forwarded-Host", "myhost"). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(body).Should(ContainSubstring(fmt.Sprintf("host=forwarded-headers"))) Expect(body).Should(ContainSubstring(fmt.Sprintf("x-forwarded-port=80"))) diff --git a/test/e2e/settings/no_auth_locations.go b/test/e2e/settings/no_auth_locations.go index a1eb741fc3..546df30432 100644 --- a/test/e2e/settings/no_auth_locations.go +++ b/test/e2e/settings/no_auth_locations.go @@ -67,7 +67,7 @@ var _ = framework.IngressNginxDescribe("No Auth locations", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusUnauthorized)) Expect(body).Should(ContainSubstring("401 Authorization Required")) }) @@ -84,7 +84,7 @@ var _ = framework.IngressNginxDescribe("No Auth locations", func() { SetBasicAuth(username, password). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) }) @@ -99,7 +99,7 @@ var _ = framework.IngressNginxDescribe("No Auth locations", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) }) }) diff --git a/test/e2e/settings/tls.go b/test/e2e/settings/tls.go index 2247ebff9d..6139f6b7d8 100644 --- a/test/e2e/settings/tls.go +++ b/test/e2e/settings/tls.go @@ -67,7 +67,7 @@ var _ = framework.IngressNginxDescribe("Settings - TLS)", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.TLS.Version).Should(BeNumerically("==", tls.VersionTLS12)) Expect(resp.TLS.CipherSuite).Should(BeNumerically("==", tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384)) @@ -86,7 +86,7 @@ var _ = framework.IngressNginxDescribe("Settings - TLS)", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.TLS.Version).Should(BeNumerically("==", tls.VersionTLS10)) Expect(resp.TLS.CipherSuite).Should(BeNumerically("==", tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA)) @@ -116,7 +116,7 @@ var _ = framework.IngressNginxDescribe("Settings - TLS)", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.Header.Get("Strict-Transport-Security")).Should(ContainSubstring("max-age=86400")) @@ -134,7 +134,7 @@ var _ = framework.IngressNginxDescribe("Settings - TLS)", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.Header.Get("Strict-Transport-Security")).ShouldNot(ContainSubstring("includeSubDomains")) @@ -152,7 +152,7 @@ var _ = framework.IngressNginxDescribe("Settings - TLS)", func() { Set("Host", host). End() - Expect(len(errs)).Should(BeNumerically("==", 0)) + Expect(errs).Should(BeEmpty()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) Expect(resp.Header.Get("Strict-Transport-Security")).Should(ContainSubstring("preload")) }) diff --git a/test/e2e/tcpudp/tcp.go b/test/e2e/tcpudp/tcp.go new file mode 100644 index 0000000000..cebb90be27 --- /dev/null +++ b/test/e2e/tcpudp/tcp.go @@ -0,0 +1,97 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package settings + +import ( + "fmt" + "strings" + + "github.com/parnurzeal/gorequest" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/ingress-nginx/test/e2e/framework" +) + +var _ = framework.IngressNginxDescribe("TCP Feature", func() { + f := framework.NewDefaultFramework("tcp") + + BeforeEach(func() { + }) + + AfterEach(func() { + }) + + It("should expose a TCP service", func() { + f.NewEchoDeploymentWithReplicas(1) + + config, err := f.KubeClientSet. + CoreV1(). + ConfigMaps(f.IngressController.Namespace). + Get("tcp-services", metav1.GetOptions{}) + Expect(err).To(BeNil(), "unexpected error obtaining tcp-services configmap") + Expect(config).NotTo(BeNil(), "expected a configmap but none returned") + + if config.Data == nil { + config.Data = map[string]string{} + } + + config.Data["8080"] = fmt.Sprintf("%v/http-svc:80", f.IngressController.Namespace) + _, err = f.KubeClientSet. + CoreV1(). + ConfigMaps(f.IngressController.Namespace). + Update(config) + Expect(err).NotTo(HaveOccurred(), "unexpected error updating configmap") + + svc, err := f.KubeClientSet. + CoreV1(). + Services(f.IngressController.Namespace). + Get("ingress-nginx", metav1.GetOptions{}) + Expect(err).To(BeNil(), "unexpected error obtaining ingress-nginx service") + Expect(svc).NotTo(BeNil(), "expected a service but none returned") + + svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{ + Name: "http-svc", + Port: 8080, + TargetPort: intstr.FromInt(8080), + }) + _, err = f.KubeClientSet. + CoreV1(). + Services(f.IngressController.Namespace). + Update(svc) + Expect(err).NotTo(HaveOccurred(), "unexpected error updating service") + + f.WaitForNginxConfiguration( + func(cfg string) bool { + return strings.Contains(cfg, fmt.Sprintf(`ngx.var.proxy_upstream_name="tcp-%v-http-svc-80"`, f.IngressController.Namespace)) + }) + + ip := f.GetNginxIP() + port, err := f.GetNginxPort("http-svc") + Expect(err).NotTo(HaveOccurred(), "unexpected error obtaning service port") + + resp, _, errs := gorequest.New(). + Get(fmt.Sprintf("http://%v:%v", ip, port)). + End() + Expect(errs).Should(BeEmpty()) + Expect(resp.StatusCode).Should(Equal(200)) + }) +}) diff --git a/test/manifests/ingress-controller/mandatory.yaml b/test/manifests/ingress-controller/mandatory.yaml index c960bb0ff4..eae42950f2 100644 --- a/test/manifests/ingress-controller/mandatory.yaml +++ b/test/manifests/ingress-controller/mandatory.yaml @@ -5,6 +5,25 @@ metadata: labels: app.kubernetes.io/name: ingress-nginx app.kubernetes.io/part-of: ingress-nginx + +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: tcp-services + labels: + app.kubernetes.io/name: ingress-nginx + app.kubernetes.io/part-of: ingress-nginx + +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: udp-services + labels: + app.kubernetes.io/name: ingress-nginx + app.kubernetes.io/part-of: ingress-nginx + --- kind: ConfigMap apiVersion: v1 @@ -206,6 +225,8 @@ spec: args: - /nginx-ingress-controller - --configmap=$(POD_NAMESPACE)/nginx-configuration + - --tcp-services-configmap=$(POD_NAMESPACE)/tcp-services + - --udp-services-configmap=$(POD_NAMESPACE)/udp-services - --publish-service=$(POD_NAMESPACE)/ingress-nginx - --annotations-prefix=nginx.ingress.kubernetes.io - --watch-namespace=${NAMESPACE}