Skip to content

Commit

Permalink
ingress: add nodepool endpoints filtering for nginx ingress controller
Browse files Browse the repository at this point in the history
When users access service through nodepool ingress, ingress controller
will balance the users rquests to the endpoints binding to the service,
the ingress controller in a nodepool should only balance the requests
to endpoints belonging to this nodepool, so the service endpoints need
to be filtered to exclude endpoints from other nodepools.

Note:
Currently we leverage yurthub data filtering framework to achieve the
purpose, which is actually a node level sidecar solution, if a nodepool
level data filtering sidecar is implemented in future, nodepool ingress
data filtering is suggested to switch to that solution.

Signed-off-by: zhenggu1 <zhengguang.zhang@intel.com>
  • Loading branch information
zhenggu1 committed Dec 28, 2021
1 parent c23c8d1 commit ce91eac
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/ingresscontroller"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
Expand Down Expand Up @@ -288,6 +289,7 @@ func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
ingresscontroller.Register(filters)
}

// createFilterChain return union filters that initializations completed.
Expand Down
4 changes: 4 additions & 0 deletions pkg/yurthub/filter/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
// If end users want to use specified LB service at the edge side,
// End users should add annotation["openyurt.io/skip-discard"]="true" for LB service.
SkipDiscardServiceAnnotation = "openyurt.io/skip-discard"

// ingresscontroller filter is used to reassemble endpoints in order to make the data traffic be
// load balanced only to the nodepool valid endpoints.
IngressControllerFilterName = "ingresscontroller"
)

// DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode.
Expand Down
168 changes: 168 additions & 0 deletions pkg/yurthub/filter/ingresscontroller/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ingresscontroller

import (
"fmt"
"io"
"net/http"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"
appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1"
)

// Register registers a filter
func Register(filters *filter.Filters) {
filters.Register(filter.IngressControllerFilterName, func() (filter.Interface, error) {
return NewFilter(), nil
})
}

func NewFilter() *ingressControllerFilter {
return &ingressControllerFilter{
Approver: filter.NewApprover("nginx-ingress-controller", "endpoints", []string{"list", "watch"}...),
workingMode: util.WorkingModeEdge,
stopCh: make(chan struct{}),
}
}

type ingressControllerFilter struct {
*filter.Approver
serviceLister listers.ServiceLister
serviceSynced cache.InformerSynced
nodepoolLister appslisters.NodePoolLister
nodePoolSynced cache.InformerSynced
nodeGetter filter.NodeGetter
nodeSynced cache.InformerSynced
nodeName string
serializerManager *serializer.SerializerManager
workingMode util.WorkingMode
stopCh chan struct{}
}

func (ssf *ingressControllerFilter) SetWorkingMode(mode util.WorkingMode) error {
ssf.workingMode = mode
return nil
}

func (ssf *ingressControllerFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error {
ssf.serviceLister = factory.Core().V1().Services().Lister()
ssf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced

if ssf.workingMode == util.WorkingModeCloud {
klog.Infof("prepare list/watch to sync node(%s) for cloud working mode", ssf.nodeName)
ssf.nodeSynced = factory.Core().V1().Nodes().Informer().HasSynced
ssf.nodeGetter = factory.Core().V1().Nodes().Lister().Get
}

return nil
}

func (ssf *ingressControllerFilter) SetYurtSharedInformerFactory(yurtFactory yurtinformers.SharedInformerFactory) error {
ssf.nodepoolLister = yurtFactory.Apps().V1alpha1().NodePools().Lister()
ssf.nodePoolSynced = yurtFactory.Apps().V1alpha1().NodePools().Informer().HasSynced

return nil
}

func (ssf *ingressControllerFilter) SetNodeName(nodeName string) error {
ssf.nodeName = nodeName

return nil
}

func (ssf *ingressControllerFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error {
if len(ssf.nodeName) == 0 {
return fmt.Errorf("node name for ingressControllerFilter is not ready")
}

// hub agent will list/watch node from kube-apiserver when hub agent work as cloud mode
if ssf.workingMode == util.WorkingModeCloud {
return nil
}
klog.Infof("prepare local disk storage to sync node(%s) for edge working mode", ssf.nodeName)

nodeKey := fmt.Sprintf("kubelet/nodes/%s", ssf.nodeName)
ssf.nodeSynced = func() bool {
obj, err := s.Get(nodeKey)
if err != nil || obj == nil {
return false
}

if _, ok := obj.(*v1.Node); !ok {
return false
}

return true
}

ssf.nodeGetter = func(name string) (*v1.Node, error) {
obj, err := s.Get(nodeKey)
if err != nil {
return nil, err
} else if obj == nil {
return nil, fmt.Errorf("node(%s) is not ready", name)
}

if node, ok := obj.(*v1.Node); ok {
return node, nil
}

return nil, fmt.Errorf("node(%s) is not found", name)
}

return nil
}

func (ssf *ingressControllerFilter) SetSerializerManager(s *serializer.SerializerManager) error {
ssf.serializerManager = s
return nil
}

func (ssf *ingressControllerFilter) Approve(comp, resource, verb string) bool {
if !ssf.Approver.Approve(comp, resource, verb) {
return false
}

if ok := cache.WaitForCacheSync(ssf.stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok {
return false
}

return true
}

func (ssf *ingressControllerFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) {
s := filterutil.CreateSerializer(req, ssf.serializerManager)
if s == nil {
klog.Errorf("skip filter, failed to create serializer in ingressControllerFilter")
return 0, rc, nil
}

handler := NewIngressControllerFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter)
return filter.NewFilterReadCloser(req, rc, handler, s, filter.IngressControllerFilterName, stopCh)
}
168 changes: 168 additions & 0 deletions pkg/yurthub/filter/ingresscontroller/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ingresscontroller

import (
"io"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1"
appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1"
)

type ingressControllerFilterHandler struct {
nodeName string
serializer *serializer.Serializer
serviceLister listers.ServiceLister
nodePoolLister appslisters.NodePoolLister
nodeGetter filter.NodeGetter
}

func NewIngressControllerFilterHandler(
nodeName string,
serializer *serializer.Serializer,
serviceLister listers.ServiceLister,
nodePoolLister appslisters.NodePoolLister,
nodeGetter filter.NodeGetter) filter.Handler {
return &ingressControllerFilterHandler{
nodeName: nodeName,
serializer: serializer,
serviceLister: serviceLister,
nodePoolLister: nodePoolLister,
nodeGetter: nodeGetter,
}
}

//ObjectResponseFilter filter the endpoints from get response object and return the bytes
func (fh *ingressControllerFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
eps, err := fh.serializer.Decode(b)
if err != nil || eps == nil {
klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of ingressControllerFilterHandler, %v", err)
return b, nil
}

endpointsList, ok := eps.(*v1.EndpointsList)
if !ok {
return b, nil
}
//filter endpoints
var items []v1.Endpoints
for i := range endpointsList.Items {
item := fh.reassembleEndpoint(&endpointsList.Items[i])
if item != nil {
items = append(items, *item)
}
}
endpointsList.Items = items

return fh.serializer.Encode(endpointsList)
}

//FilterWatchObject filter the endpoints from watch response object and return the bytes
func (fh *ingressControllerFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error {
defer func() {
close(ch)
}()

d, err := fh.serializer.WatchDecoder(rc)
if err != nil {
klog.Errorf("StreamResponseFilter of ingressControllerFilterHandler ended with error, %v", err)
return err
}
for {
watchType, obj, err := d.Decode()
if err != nil {
return err
}
var wEvent watch.Event
wEvent.Type = watchType
endpoints, ok := obj.(*v1.Endpoints)
if ok {
item := fh.reassembleEndpoint(endpoints)
if item == nil {
continue
}
wEvent.Object = item
} else {
wEvent.Object = obj
}
klog.V(5).Infof("filter watch decode endpoint: type: %s, obj=%#+v", watchType, endpoints)
ch <- wEvent
}
}

// reassembleEndpoints will filter the valid endpoints to its nodepool
func (fh *ingressControllerFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {
svcName := endpoints.Name
_, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)
return endpoints
}
// filter the endpoints on the node which is in the same nodepool with current node
currentNode, err := fh.nodeGetter(fh.nodeName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)
return endpoints
}
if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
nodePool, err := fh.nodePoolLister.Get(nodePoolName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)
return endpoints
}
isNodePoolValidEps := false
var newEpsAddr []v1.EndpointAddress
for i := range endpoints.Subsets {
for j := range endpoints.Subsets[i].Addresses {
nodeName := endpoints.Subsets[i].Addresses[j].NodeName
if nodeName == nil {
//ignore endpoints whose NodeName is not set, for example "kubernetes"
continue
}
if inSameNodePool(*nodeName, nodePool.Status.Nodes) {
isNodePoolValidEps = true
newEpsAddr = append(newEpsAddr, endpoints.Subsets[i].Addresses[j])
klog.Infof("endpoints/%s address/%s with nodeName/%s is valid to nodepool/%s",
svcName, endpoints.Subsets[i].Addresses[j].IP, *nodeName, nodePoolName)
}
}
endpoints.Subsets[i].Addresses = newEpsAddr
newEpsAddr = nil
}
if !isNodePoolValidEps {
return nil
}
}
return endpoints
}

func inSameNodePool(nodeName string, nodeList []string) bool {
for _, n := range nodeList {
if nodeName == n {
return true
}
}

return false
}

0 comments on commit ce91eac

Please sign in to comment.