Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingress: add nodepool endpoints filtering for nginx ingress controller #696

Merged
merged 1 commit into from
Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/discardcloudservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/ingresscontroller"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/initializer"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
Expand Down Expand Up @@ -288,6 +289,7 @@ func registerAllFilters(filters *filter.Filters) {
servicetopology.Register(filters)
masterservice.Register(filters)
discardcloudservice.Register(filters)
ingresscontroller.Register(filters)
}

// createFilterChain return union filters that initializations completed.
Expand Down
4 changes: 4 additions & 0 deletions pkg/yurthub/filter/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
// If end users want to use specified LB service at the edge side,
// End users should add annotation["openyurt.io/skip-discard"]="true" for LB service.
SkipDiscardServiceAnnotation = "openyurt.io/skip-discard"

// ingresscontroller filter is used to reassemble endpoints in order to make the data traffic be
// load balanced only to the nodepool valid endpoints.
IngressControllerFilterName = "ingresscontroller"
)

// DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode.
Expand Down
168 changes: 168 additions & 0 deletions pkg/yurthub/filter/ingresscontroller/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
Copyright 2021 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ingresscontroller

import (
"fmt"
"io"
"net/http"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/cachemanager"
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
filterutil "github.com/openyurtio/openyurt/pkg/yurthub/filter/util"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
yurtinformers "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/informers/externalversions"
appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1"
)

// Register registers a filter
func Register(filters *filter.Filters) {
filters.Register(filter.IngressControllerFilterName, func() (filter.Interface, error) {
return NewFilter(), nil
})
}

func NewFilter() *ingressControllerFilter {
return &ingressControllerFilter{
Approver: filter.NewApprover("nginx-ingress-controller", "endpoints", []string{"list", "watch"}...),
workingMode: util.WorkingModeEdge,
stopCh: make(chan struct{}),
}
}

type ingressControllerFilter struct {
*filter.Approver
serviceLister listers.ServiceLister
serviceSynced cache.InformerSynced
nodepoolLister appslisters.NodePoolLister
nodePoolSynced cache.InformerSynced
nodeGetter filter.NodeGetter
nodeSynced cache.InformerSynced
nodeName string
serializerManager *serializer.SerializerManager
workingMode util.WorkingMode
stopCh chan struct{}
}

func (ssf *ingressControllerFilter) SetWorkingMode(mode util.WorkingMode) error {
ssf.workingMode = mode
return nil
}

func (ssf *ingressControllerFilter) SetSharedInformerFactory(factory informers.SharedInformerFactory) error {
ssf.serviceLister = factory.Core().V1().Services().Lister()
ssf.serviceSynced = factory.Core().V1().Services().Informer().HasSynced

if ssf.workingMode == util.WorkingModeCloud {
klog.Infof("prepare list/watch to sync node(%s) for cloud working mode", ssf.nodeName)
ssf.nodeSynced = factory.Core().V1().Nodes().Informer().HasSynced
ssf.nodeGetter = factory.Core().V1().Nodes().Lister().Get
}

return nil
}

func (ssf *ingressControllerFilter) SetYurtSharedInformerFactory(yurtFactory yurtinformers.SharedInformerFactory) error {
ssf.nodepoolLister = yurtFactory.Apps().V1alpha1().NodePools().Lister()
ssf.nodePoolSynced = yurtFactory.Apps().V1alpha1().NodePools().Informer().HasSynced

return nil
}

func (ssf *ingressControllerFilter) SetNodeName(nodeName string) error {
ssf.nodeName = nodeName

return nil
}

func (ssf *ingressControllerFilter) SetStorageWrapper(s cachemanager.StorageWrapper) error {
if len(ssf.nodeName) == 0 {
return fmt.Errorf("node name for ingressControllerFilter is not ready")
}

rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved
// hub agent will list/watch node from kube-apiserver when hub agent work as cloud mode
if ssf.workingMode == util.WorkingModeCloud {
return nil
}
klog.Infof("prepare local disk storage to sync node(%s) for edge working mode", ssf.nodeName)

nodeKey := fmt.Sprintf("kubelet/nodes/%s", ssf.nodeName)
ssf.nodeSynced = func() bool {
obj, err := s.Get(nodeKey)
if err != nil || obj == nil {
return false
}

if _, ok := obj.(*v1.Node); !ok {
return false
}

return true
}

ssf.nodeGetter = func(name string) (*v1.Node, error) {
obj, err := s.Get(nodeKey)
if err != nil {
return nil, err
} else if obj == nil {
return nil, fmt.Errorf("node(%s) is not ready", name)
}

if node, ok := obj.(*v1.Node); ok {
return node, nil
}

return nil, fmt.Errorf("node(%s) is not found", name)
}

return nil
}

func (ssf *ingressControllerFilter) SetSerializerManager(s *serializer.SerializerManager) error {
ssf.serializerManager = s
return nil
}

func (ssf *ingressControllerFilter) Approve(comp, resource, verb string) bool {
if !ssf.Approver.Approve(comp, resource, verb) {
return false
}

if ok := cache.WaitForCacheSync(ssf.stopCh, ssf.nodeSynced, ssf.serviceSynced, ssf.nodePoolSynced); !ok {
return false
}

return true
}

func (ssf *ingressControllerFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) {
s := filterutil.CreateSerializer(req, ssf.serializerManager)
if s == nil {
klog.Errorf("skip filter, failed to create serializer in ingressControllerFilter")
return 0, rc, nil
}

handler := NewIngressControllerFilterHandler(ssf.nodeName, s, ssf.serviceLister, ssf.nodepoolLister, ssf.nodeGetter)
return filter.NewFilterReadCloser(req, rc, handler, s, filter.IngressControllerFilterName, stopCh)
}
174 changes: 174 additions & 0 deletions pkg/yurthub/filter/ingresscontroller/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
Copyright 2021 The OpenYurt Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ingresscontroller

import (
"io"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
nodepoolv1alpha1 "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/apis/apps/v1alpha1"
appslisters "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/listers/apps/v1alpha1"
)

type ingressControllerFilterHandler struct {
nodeName string
serializer *serializer.Serializer
serviceLister listers.ServiceLister
nodePoolLister appslisters.NodePoolLister
nodeGetter filter.NodeGetter
}

func NewIngressControllerFilterHandler(
nodeName string,
serializer *serializer.Serializer,
serviceLister listers.ServiceLister,
nodePoolLister appslisters.NodePoolLister,
nodeGetter filter.NodeGetter) filter.Handler {
return &ingressControllerFilterHandler{
nodeName: nodeName,
serializer: serializer,
serviceLister: serviceLister,
nodePoolLister: nodePoolLister,
nodeGetter: nodeGetter,
}
}

// ObjectResponseFilter filter the endpoints from get response object and return the bytes
func (fh *ingressControllerFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) {
eps, err := fh.serializer.Decode(b)
if err != nil || eps == nil {
klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of ingressControllerFilterHandler, %v", err)
return b, nil
}

endpointsList, ok := eps.(*v1.EndpointsList)
if !ok {
return b, nil
}
// filter endpoints
var items []v1.Endpoints
for i := range endpointsList.Items {
item := fh.reassembleEndpoint(&endpointsList.Items[i])
if item != nil {
items = append(items, *item)
}
}
endpointsList.Items = items

return fh.serializer.Encode(endpointsList)
}

// FilterWatchObject filter the endpoints from watch response object and return the bytes
func (fh *ingressControllerFilterHandler) StreamResponseFilter(rc io.ReadCloser, ch chan watch.Event) error {
defer func() {
close(ch)
}()

d, err := fh.serializer.WatchDecoder(rc)
if err != nil {
klog.Errorf("StreamResponseFilter of ingressControllerFilterHandler ended with error, %v", err)
return err
}
for {
watchType, obj, err := d.Decode()
if err != nil {
return err
}
var wEvent watch.Event
wEvent.Type = watchType
endpoints, ok := obj.(*v1.Endpoints)
if ok {
item := fh.reassembleEndpoint(endpoints)
if item == nil {
continue
}
wEvent.Object = item
} else {
wEvent.Object = obj
}
klog.V(5).Infof("filter watch decode endpoint: type: %s, obj=%#+v", watchType, endpoints)
ch <- wEvent
}
}

// reassembleEndpoints will filter the valid endpoints to its nodepool
func (fh *ingressControllerFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints {
svcName := endpoints.Name
_, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err)
return endpoints
}
// filter the endpoints on the node which is in the same nodepool with current node
currentNode, err := fh.nodeGetter(fh.nodeName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err)
return endpoints
}
if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok {
nodePool, err := fh.nodePoolLister.Get(nodePoolName)
if err != nil {
klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err)
return endpoints
}
var newEpSubsets []v1.EndpointSubset
for i := range endpoints.Subsets {
endpoints.Subsets[i].Addresses = filterValidEndpointsAddr(endpoints.Subsets[i].Addresses, nodePool)
endpoints.Subsets[i].NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i].NotReadyAddresses, nodePool)
if endpoints.Subsets[i].Addresses != nil || endpoints.Subsets[i].NotReadyAddresses != nil {
rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved
newEpSubsets = append(newEpSubsets, endpoints.Subsets[i])
}
}
endpoints.Subsets = newEpSubsets
if len(endpoints.Subsets) == 0 {
// this endpoints has no nodepool valid addresses for ingress controller, return nil to ignore it
return nil
rambohe-ch marked this conversation as resolved.
Show resolved Hide resolved
}
}
return endpoints
}

func filterValidEndpointsAddr(addresses []v1.EndpointAddress, nodePool *nodepoolv1alpha1.NodePool) []v1.EndpointAddress {
var newEpAddresses []v1.EndpointAddress
for i := range addresses {
nodeName := addresses[i].NodeName
if nodeName == nil {
// ignore endpoints whose NodeName is not set, for example "kubernetes"
continue
}
if inSameNodePool(*nodeName, nodePool.Status.Nodes) {
newEpAddresses = append(newEpAddresses, addresses[i])
klog.Infof("endpoints address/%s with nodeName/%s is valid to nodepool/%s", addresses[i].IP, *nodeName, nodePool.Name)
}
}
return newEpAddresses
}

func inSameNodePool(nodeName string, nodeList []string) bool {
for _, n := range nodeList {
if nodeName == n {
return true
}
}
return false
}