Skip to content

Commit

Permalink
Merge pull request #588 from oldthreefeng/develop
Browse files Browse the repository at this point in the history
use channel to manage goroutine when upgrade nodes
  • Loading branch information
oldthreefeng authored Jan 15, 2021
2 parents 206ec10 + 0c218d5 commit 9ec95b9
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 14 deletions.
6 changes: 3 additions & 3 deletions install/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func (s *SealosInstaller) appendApiServer() error {
reader := bufio.NewReader(file)
for {
str, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if strings.Contains(str, ApiServer) {
logger.Info("local %s is already exists %s", etcHostPath, ApiServer)
return nil
}
if err == io.EOF {
break
}
}
write := bufio.NewWriter(file)
write.WriteString(etcHostMap)
Expand Down
2 changes: 1 addition & 1 deletion install/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func (s *SealosInstaller) SendPackage() {
// rm old sealos in package avoid old version problem. if sealos not exist in package then skip rm
kubeHook := fmt.Sprintf("cd /root && rm -rf kube && tar zxvf %s && cd /root/kube/shell && rm -f ../bin/sealos && bash init.sh", pkg)
deletekubectl := `sed -i '/kubectl/d;/sealos/d' /root/.bashrc `
completion := "echo 'command -v kubectl &>/dev/null && source <(kubectl completion bash)' >> /root/.bashrc && echo 'command -v sealos &>/dev/null && source <(sealos completion bash)' >> /root/.bashrc && source /root/.bashrc"
completion := "echo 'command -v kubectl &>/dev/null && source <(kubectl completion bash)' >> /root/.bashrc && echo '[ -x /usr/bin/sealos ] && source <(sealos completion bash)' >> /root/.bashrc && source /root/.bashrc"
kubeHook = kubeHook + " && " + deletekubectl + " && " + completion
PkgUrl = SendPackage(PkgUrl, s.Hosts, "/root", nil, &kubeHook)

Expand Down
17 changes: 8 additions & 9 deletions install/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package install

import (
"fmt"
"os"
"sync"
"os"
"time"

"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -95,15 +94,16 @@ func (u *SealosUpgrade) UpgradeOtherMaster() {
}

func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) {
var wg sync.WaitGroup
wg := NewPool(2)
var err error
for _, hostname := range hostnames {
wg.Add(1)
go func(node string) {
defer wg.Done()
ip := u.GetIpByHostname(node)
// drain worker node is too danger for prod use; do not drain nodes if worker nodes~
if isMaster {
logger.Info("first: to drain master node %s", node)
logger.Info("[%s] first: to drain master node %s", ip, node)
cmdDrain := fmt.Sprintf(`kubectl drain %s --ignore-daemonsets --delete-local-data`, node)
err := SSHConfig.CmdAsync(u.Masters[0], cmdDrain)
if err != nil {
Expand All @@ -114,8 +114,7 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) {
}

// second to exec kubeadm upgrade node
logger.Info("second: to exec kubeadm upgrade node on %s", node)
ip := u.GetIpByHostname(node)
logger.Info("[%s] second: to exec kubeadm upgrade node on %s", ip, node)
var cmdUpgrade string
if ip == u.Masters[0] {
cmdUpgrade = fmt.Sprintf("kubeadm upgrade apply --certificate-renewal=false --yes %s", u.NewVersion)
Expand All @@ -134,7 +133,7 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) {
}

// third to restart kubelet
logger.Info("third: to restart kubelet on %s", node)
logger.Info("[%s] third: to restart kubelet on %s", ip, node)
err = SSHConfig.CmdAsync(ip, "systemctl daemon-reload && systemctl restart kubelet")
if err != nil {
logger.Error("systemctl daemon-reload && systemctl restart kubelet err: ", err)
Expand All @@ -144,14 +143,14 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) {
time.Sleep(time.Second * 10)
k8sNode, _ := k8s.GetNodeByName(u.Client, node)
if k8s.IsNodeReady(*k8sNode) {
logger.Info("fourth: %s nodes is ready", node)
logger.Info("[%s] fourth: %s nodes is ready", ip,node)

// fifth to uncordon node
err = k8s.CordonUnCordon(u.Client, node, false)
if err != nil {
logger.Error(`k8s.CordonUnCordon err: %s, \n After upgrade, please run "kubectl uncordon %s" to enable Scheduling`, err, node)
}
logger.Info("fifth: to uncordon node, 10 seconds to wait for %s uncordon", node)
logger.Info("[%s] fifth: to uncordon node, 10 seconds to wait for %s uncordon", ip, node)
} else {
logger.Error("fourth: %s nodes is not ready, please check the nodes logs to find out reason", node)
}
Expand Down
38 changes: 38 additions & 0 deletions install/upgrade_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package install

import "sync"


type uPool struct {
queue chan int
wg *sync.WaitGroup
}

func NewPool (size int) *uPool {
if size <= 1 {
size = 1
}
return &uPool{
queue: make(chan int, size),
wg: &sync.WaitGroup{},
}
}

func (p *uPool) Add(delta int) {
for i := 0; i < delta; i++ {
p.queue <- 1
}
for i := 0; i > delta; i-- {
<-p.queue
}
p.wg.Add(delta)
}

func (p *uPool) Done() {
<-p.queue
p.wg.Done()
}

func (p *uPool) Wait() {
p.wg.Wait()
}
17 changes: 17 additions & 0 deletions install/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package install
import (
"fmt"
"reflect"
"runtime"
"strings"
"testing"
"time"
)

func TestPath(t *testing.T) {
Expand Down Expand Up @@ -228,4 +230,19 @@ func TestFor120(t *testing.T) {
}
})
}
}

func Test_Example(t *testing.T) {
pool := NewPool(2)
println(runtime.NumGoroutine())
for i := 0; i < 10; i++ {
pool.Add(1)
go func(n int) {
time.Sleep(time.Second)
println(runtime.NumGoroutine(), n)
pool.Done()
}(i)
}
pool.Wait()
println(runtime.NumGoroutine())
}
2 changes: 1 addition & 1 deletion k8s/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func CordonUnCordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned b
return err
}
if node.Spec.Unschedulable == cordoned {
logger.Alert("Node %s is already cordoned: %v", nodeName, cordoned)
logger.Info("Node %s is already Uncordoned, skip...", nodeName)
return nil
}
node.Spec.Unschedulable = cordoned
Expand Down

0 comments on commit 9ec95b9

Please sign in to comment.