Skip to content

Commit

Permalink
add process for lb-svc ports update (#4660)
Browse files Browse the repository at this point in the history
Signed-off-by: 马洪贞 <hzma@alauda.io>
  • Loading branch information
hongzhen-ma authored Nov 5, 2024
1 parent 451a463 commit 6842df3
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 9 deletions.
29 changes: 25 additions & 4 deletions dist/images/vpcnatgateway/lb-svc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,43 @@ function add_dnat() {
done
}

function del_dnat() {
for rule in $@
do
arr=(${rule//,/ })
eip=(${arr[0]//\// })
dport=${arr[1]}
protocol=${arr[2]}
internalIp=${arr[3]}
internalPort=${arr[4]}

checkRule="-d $eip -p $protocol --dport $dport -j DNAT --to-destination $internalIp:$internalPort"
if iptables -t nat -C PREROUTING $checkRule > /dev/null 2>&1; then
exec_cmd "iptables -t nat -D PREROUTING -d $eip -p $protocol --dport $dport -j DNAT --to-destination $internalIp:$internalPort"
fi
done
}

rules=${@:2:${#}}
opt=$1
case $opt in
init)
init)
echo "init $rules"
init $rules
;;
eip-add)
eip-add)
echo "eip-add $rules"
add_eip $rules
;;
dnat-add)
dnat-add)
echo "dnat-add $rules"
add_dnat $rules
;;
*)
dnat-del)
echo "dnat-del rules"
del_dnat $rules
;;
*)
echo "Usage: $0 [init|eip-add|dnat-add] ..."
exit 1
;;
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ type Controller struct {
serviceSynced cache.InformerSynced
addServiceQueue workqueue.TypedRateLimitingInterface[string]
deleteServiceQueue workqueue.TypedRateLimitingInterface[*vpcService]
updateServiceQueue workqueue.TypedRateLimitingInterface[string]
updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
svcKeyMutex keymutex.KeyMutex

endpointsLister v1.EndpointsLister
Expand Down Expand Up @@ -454,7 +454,7 @@ func Run(ctx context.Context, config *Configuration) {
serviceSynced: serviceInformer.Informer().HasSynced,
addServiceQueue: newTypedRateLimitingQueue[string]("AddService", nil),
deleteServiceQueue: newTypedRateLimitingQueue[*vpcService]("DeleteService", nil),
updateServiceQueue: newTypedRateLimitingQueue[string]("UpdateService", nil),
updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
svcKeyMutex: keymutex.NewHashed(numKeyLocks),

endpointsLister: endpointInformer.Lister(),
Expand Down
63 changes: 61 additions & 2 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"reflect"
"slices"
"strings"
"time"
Expand All @@ -26,6 +27,12 @@ type vpcService struct {
Svc *v1.Service
}

type updateSvcObject struct {
key string
oldPorts []v1.ServicePort
newPorts []v1.ServicePort
}

func (c *Controller) enqueueAddService(obj interface{}) {
var key string
var err error
Expand Down Expand Up @@ -122,7 +129,12 @@ func (c *Controller) enqueueUpdateService(oldObj, newObj interface{}) {
key = strings.Join([]string{key, ipsToDelStr}, "#")
}

c.updateServiceQueue.Add(key)
updateSvc := &updateSvcObject{
key: key,
oldPorts: oldSvc.Spec.Ports,
newPorts: newSvc.Spec.Ports,
}
c.updateServiceQueue.Add(updateSvc)
}

func (c *Controller) handleDeleteService(service *vpcService) error {
Expand Down Expand Up @@ -193,7 +205,8 @@ func (c *Controller) handleDeleteService(service *vpcService) error {
return nil
}

func (c *Controller) handleUpdateService(key string) error {
func (c *Controller) handleUpdateService(svcObject *updateSvcObject) error {
key := svcObject.key
keys := strings.Split(key, "#")
key = keys[0]
var ipsToDel []string
Expand Down Expand Up @@ -330,6 +343,35 @@ func (c *Controller) handleUpdateService(key string) error {
if needUpdateEndpointQueue {
c.addOrUpdateEndpointQueue.Add(key)
}

if c.config.EnableLbSvc && svc.Spec.Type == v1.ServiceTypeLoadBalancer {
changed, err := c.checkLbSvcDeployAnnotationChanged(svc)
if err != nil {
klog.Errorf("failed to check annotation change for lb svc %s: %v", key, err)
return err
}

// only process svc.spec.ports update
if !changed {
klog.Infof("update loadbalancer service %s", key)
pod, err := c.getLbSvcPod(name, namespace)
if err != nil {
klog.Errorf("failed to get pod for lb svc %s: %v", key, err)
return err
}

toDel := diffSvcPorts(svcObject.oldPorts, svcObject.newPorts)
if err := c.delDnatRules(pod, toDel, svc); err != nil {
klog.Errorf("failed to delete dnat rules, err: %v", err)
return err
}
if err = c.updatePodAttachNets(pod, svc); err != nil {
klog.Errorf("failed to update pod attachment network for lb svc %s: %v", key, err)
return err
}
}
}

return nil
}

Expand Down Expand Up @@ -448,3 +490,20 @@ func getVipIps(svc *v1.Service) []string {
}
return ips
}

func diffSvcPorts(oldPorts, newPorts []v1.ServicePort) (toDel []v1.ServicePort) {
for _, oldPort := range oldPorts {
found := false
for _, newPort := range newPorts {
if reflect.DeepEqual(oldPort, newPort) {
found = true
break
}
}
if !found {
toDel = append(toDel, oldPort)
}
}

return toDel
}
52 changes: 51 additions & 1 deletion pkg/controller/service_lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
initRouteTable = "init"
podEIPAdd = "eip-add"
podDNATAdd = "dnat-add"
podDNATDel = "dnat-del"
attachmentName = "lb-svc-attachment"
attachmentNs = "kube-system"
)
Expand Down Expand Up @@ -326,7 +327,11 @@ func (c *Controller) updatePodAttachNets(pod *corev1.Pod, svc *corev1.Service) e
}

var rules []string
rules = append(rules, fmt.Sprintf("%s,%d,%s,%s,%d,%s", loadBalancerIP, port.Port, protocol, svc.Spec.ClusterIP, port.Port, defaultGateway))
targetPort := port.TargetPort.IntValue()
if targetPort == 0 {
targetPort = int(port.Port)
}
rules = append(rules, fmt.Sprintf("%s,%d,%s,%s,%d,%s", loadBalancerIP, port.Port, protocol, svc.Spec.ClusterIP, targetPort, defaultGateway))
klog.Infof("add dnat rules for lb svc pod, %v", rules)
if err := c.execNatRules(pod, podDNATAdd, rules); err != nil {
klog.Errorf("failed to add dnat for pod, err: %v", err)
Expand Down Expand Up @@ -395,3 +400,48 @@ func (c *Controller) checkAndReInitLbSvcPod(pod *corev1.Pod) error {

return nil
}

func (c *Controller) checkLbSvcDeployAnnotationChanged(svc *corev1.Service) (bool, error) {
deployName := genLbSvcDpName(svc.Name)
deploy, err := c.config.KubeClient.AppsV1().Deployments(svc.Namespace).Get(context.Background(), deployName, metav1.GetOptions{})
if err != nil {
return false, err
}

if newDeploy := c.updateLbSvcDeployment(svc, deploy); newDeploy == nil {
klog.V(3).Infof("no need to update deployment %s/%s", deploy.Namespace, deploy.Name)
return false, nil
}
return true, nil
}

func (c *Controller) delDnatRules(pod *corev1.Pod, toDel []corev1.ServicePort, svc *corev1.Service) error {
providerName := getAttachNetworkProvider(svc)
attachIPAnnotation := fmt.Sprintf(util.IPAddressAnnotationTemplate, providerName)
loadBalancerIP := pod.Annotations[attachIPAnnotation]

for _, port := range toDel {
var protocol string
switch port.Protocol {
case corev1.ProtocolTCP:
protocol = util.ProtocolTCP
case corev1.ProtocolUDP:
protocol = util.ProtocolUDP
case corev1.ProtocolSCTP:
protocol = util.ProtocolSCTP
}

var rules []string
targetPort := port.TargetPort.IntValue()
if targetPort == 0 {
targetPort = int(port.Port)
}
rules = append(rules, fmt.Sprintf("%s,%d,%s,%s,%d", loadBalancerIP, port.Port, protocol, svc.Spec.ClusterIP, targetPort))
klog.Infof("delete dnat rules for lb svc pod, %v", rules)
if err := c.execNatRules(pod, podDNATDel, rules); err != nil {
klog.Errorf("failed to del dnat rules for pod, err: %v", err)
return err
}
}
return nil
}

0 comments on commit 6842df3

Please sign in to comment.