Skip to content

Commit

Permalink
ingress: add nodepool endpoints filtering for nginx ingress controller
Browse files Browse the repository at this point in the history
When users access service through nodepool ingress, ingress controller
will balance the users rquests to the endpoints binding to the service,
the ingress controller in a nodepool should only balance the requests
to endpoints belonging to this nodepool, so the service endpoints need
to be filtered to exclude endpoints from other endpoints.

Note:
Currently we leverage yurthub data filtering framework to achieve the
purpose, which is actually a node level sidecar solution, if a nodepool
level data filtering sidecar is implemented in future, nodepool ingress
data filtering is suggested to switch to that solution.

Signed-off-by: zhenggu1 <zhengguang.zhang@intel.com>
  • Loading branch information
zhenggu1 committed Dec 26, 2021
1 parent 5063752 commit 1213413
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/ingresscontroller"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
Expand Down Expand Up @@ -279,6 +280,7 @@ func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
ingresscontroller.Register(filters)
}

// createFilterChain return union filters that initializations completed.
Expand Down
4 changes: 4 additions & 0 deletions pkg/yurthub/filter/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
// If end users want to use specified LB service at the edge side,
// End users should add annotation["openyurt.io/skip-discard"]="true" for LB service.
SkipDiscardServiceAnnotation = "openyurt.io/skip-discard"

// ingresscontroller filter is used to reassemble endpoints in order to make the data traffic be
// load balanced only to the nodepool valid endpoints.
IngressControllerFilterName = "ingresscontroller"
)

// DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode.
Expand Down
148 changes: 148 additions & 0 deletions pkg/yurthub/filter/ingresscontroller/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ingresscontroller

import (
"fmt"
"io"
"net/http"

"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"
appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

// Register registers a filter
func Register(filters *filter.Filters) {
filters.Register(filter.IngressControllerFilterName, func() (filter.Interface, error) {
return NewFilter(), nil
})
}

func NewFilter() *ingressControllerFilter {
return &ingressControllerFilter{
Approver: filter.NewApprover("nginx-ingress-controller", "endpoints", []string{"list", "watch"}...),
stopCh: make(chan struct{}),
}
}

type ingressControllerFilter struct {
*filter.Approver
serviceLister listers.ServiceLister
serviceSynced cache.InformerSynced
nodepoolLister appslisters.NodePoolLister
nodePoolSynced cache.InformerSynced
nodeGetter filter.NodeGetter
nodeSynced cache.InformerSynced
nodeName string
serializerManager *serializer.SerializerManager
stopCh chan struct{}
}

func (ssf *ingressControllerFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error {
ssf.serviceLister = factory.Core().V1().Services().Lister()
ssf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced

return nil
}

func (ssf *ingressControllerFilter) SetYurtSharedInformerFactory(yurtFactory yurtinformers.SharedInformerFactory) error {
ssf.nodepoolLister = yurtFactory.Apps().V1alpha1().NodePools().Lister()
ssf.nodePoolSynced = yurtFactory.Apps().V1alpha1().NodePools().Informer().HasSynced

return nil
}

func (ssf *ingressControllerFilter) SetNodeName(nodeName string) error {
ssf.nodeName = nodeName

return nil
}

func (ssf *ingressControllerFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error {
if len(ssf.nodeName) == 0 {
return fmt.Errorf("node name for ingressControllerFilter is not ready")
}

nodeKey := fmt.Sprintf("kubelet/nodes/%s", ssf.nodeName)
ssf.nodeSynced = func() bool {
obj, err := s.Get(nodeKey)
if err != nil || obj == nil {
return false
}

if _, ok := obj.(*v1.Node); !ok {
return false
}

return true
}

ssf.nodeGetter = func(name string) (*v1.Node, error) {
obj, err := s.Get(nodeKey)
if err != nil {
return nil, err
} else if obj == nil {
return nil, fmt.Errorf("node(%s) is not ready", name)
}

if node, ok := obj.(*v1.Node); ok {
return node, nil
}

return nil, fmt.Errorf("node(%s) is not found", name)
}

return nil
}

func (ssf *ingressControllerFilter) SetSerializerManager(s *serializer.SerializerManager) error {
ssf.serializerManager = s
return nil
}

func (ssf *ingressControllerFilter) Approve(comp, resource, verb string) bool {
if !ssf.Approver.Approve(comp, resource, verb) {
return false
}

if ok := cache.WaitForCacheSync(ssf.stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok {
return false
}

return true
}

func (ssf *ingressControllerFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) {
s := filterutil.CreateSerializer(req, ssf.serializerManager)
if s == nil {
klog.Errorf("skip filter, failed to create serializer in ingressControllerFilter")
return 0, rc, nil
}

handler := NewIngressControllerFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter)
return filter.NewFilterReadCloser(req, rc, handler, s, filter.IngressControllerFilterName, stopCh)
}
179 changes: 179 additions & 0 deletions pkg/yurthub/filter/ingresscontroller/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ingresscontroller

import (
"io"

"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1"
appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
)

const (
AnnotationIngressControllerKey = "openyurt.io/topologyKeys"
AnnotationIngressControllerValueNode = "kubernetes.io/hostname"
AnnotationIngressControllerValueZone = "kubernetes.io/zone"
AnnotationIngressControllerValueNodePool = "openyurt.io/nodepool"
)

type ingressControllerFilterHandler struct {
nodeName string
serializer *serializer.Serializer
serviceLister listers.ServiceLister
nodePoolLister appslisters.NodePoolLister
nodeGetter filter.NodeGetter
}

func NewIngressControllerFilterHandler(
nodeName string,
serializer *serializer.Serializer,
serviceLister listers.ServiceLister,
nodePoolLister appslisters.NodePoolLister,
nodeGetter filter.NodeGetter) filter.Handler {
return &ingressControllerFilterHandler{
nodeName: nodeName,
serializer: serializer,
serviceLister: serviceLister,
nodePoolLister: nodePoolLister,
nodeGetter: nodeGetter,
}
}

//ObjectResponseFilter filter the endpoints from get response object and return the bytes
func (fh *ingressControllerFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
eps, err := fh.serializer.Decode(b)
if err != nil || eps == nil {
klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of ingressControllerFilterHandler, %v", err)
return b, nil
}

endpointsList, ok := eps.(*v1.EndpointsList)
if !ok {
return b, nil
}
//filter endpoints
var items []v1.Endpoints
for i := range endpointsList.Items {
item := fh.reassembleEndpoint(&endpointsList.Items[i])
if item != nil {
items = append(items, *item)
}
}
endpointsList.Items = items

return fh.serializer.Encode(endpointsList)
}

//FilterWatchObject filter the endpoints from watch response object and return the bytes
func (fh *ingressControllerFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error {
defer func() {
close(ch)
}()

d, err := fh.serializer.WatchDecoder(rc)
if err != nil {
klog.Errorf("StreamResponseFilter of ingressControllerFilterHandler ended with error, %v", err)
return err
}
for {
watchType, obj, err := d.Decode()
if err != nil {
return err
}
var wEvent watch.Event
wEvent.Type = watchType
endpoints, ok := obj.(*v1.Endpoints)
if ok {
item := fh.reassembleEndpoint(endpoints)
if item == nil {
continue
}
wEvent.Object = item
} else {
wEvent.Object = obj
}
klog.V(5).Infof("filter watch decode endpoint: type: %s, obj=%#+v", watchType, endpoints)
ch <- wEvent
}
}

// reassembleEndpoints will discard LB service endpoints and filter the valid endpoints to its nodepool
func (fh *ingressControllerFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {
svcName := endpoints.Name
svc, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)
return endpoints
}
// discard endpoints if service type is LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("endpoints(%s/%s) of load balancer service is discarded", endpoints.Namespace, svcName)
return nil
}
// filter the endpoints on the node which is in the same nodepool with current node
currentNode, err := fh.nodeGetter(fh.nodeName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)
return endpoints
}
if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
nodePool, err := fh.nodePoolLister.Get(nodePoolName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)
return endpoints
}
isNodePoolValidEps := false
var newEpsA []v1.EndpointAddress
for i := range endpoints.Subsets {
for j := range endpoints.Subsets[i].Addresses {
nodeName := endpoints.Subsets[i].Addresses[j].NodeName
if nodeName == nil {
//ignore endpoints whose NodeName is not set, for example "kubernetes"
continue
}
if inSameNodePool(*nodeName, nodePool.Status.Nodes) {
isNodePoolValidEps = true
newEpsA = append(newEpsA, endpoints.Subsets[i].Addresses[j])
klog.Infof("endpoints/%s address IP/%s is valid to nodepool %s", svcName, endpoints.Subsets[i].Addresses[j].IP, nodePoolName)
}
}
endpoints.Subsets[i].Addresses = newEpsA
newEpsA = nil
}
if !isNodePoolValidEps {
return nil
}
}
return endpoints
}

func inSameNodePool(nodeName string, nodeList []string) bool {
for _, n := range nodeList {
if nodeName == n {
return true
}
}

return false
}

0 comments on commit 1213413

Please sign in to comment.