Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use channel to manage goroutine when upgrade nodes #588

Merged
merged 2 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions install/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ func (s *SealosInstaller) appendApiServer() error {
reader := bufio.NewReader(file)
for {
str, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if strings.Contains(str, ApiServer) {
logger.Info("local %s is already exists %s", etcHostPath, ApiServer)
return nil
}
if err == io.EOF {
break
}
Comment on lines 111 to +117
Copy link
Collaborator Author

@oldthreefeng oldthreefeng Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果是最后一行, 就直接break了。 所以把判断是否存在放在前面。

}
write := bufio.NewWriter(file)
write.WriteString(etcHostMap)
Expand Down
2 changes: 1 addition & 1 deletion install/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func (s *SealosInstaller) SendPackage() {
// rm old sealos in package avoid old version problem. if sealos not exist in package then skip rm
kubeHook := fmt.Sprintf("cd /root && rm -rf kube && tar zxvf %s && cd /root/kube/shell && rm -f ../bin/sealos && bash init.sh", pkg)
deletekubectl := `sed -i '/kubectl/d;/sealos/d' /root/.bashrc `
completion := "echo 'command -v kubectl &>/dev/null && source <(kubectl completion bash)' >> /root/.bashrc && echo 'command -v sealos &>/dev/null && source <(sealos completion bash)' >> /root/.bashrc && source /root/.bashrc"
completion := "echo 'command -v kubectl &>/dev/null && source <(kubectl completion bash)' >> /root/.bashrc && echo '[ -x /usr/bin/sealos ] && source <(sealos completion bash)' >> /root/.bashrc && source /root/.bashrc"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

避免 permission deny 的问题。 如果 不能执行 sealos 就不需要执行 补全。

kubeHook = kubeHook + " && " + deletekubectl + " && " + completion
PkgUrl = SendPackage(PkgUrl, s.Hosts, "/root", nil, &kubeHook)

Expand Down
17 changes: 8 additions & 9 deletions install/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package install

import (
"fmt"
"os"
"sync"
"os"
"time"

"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -95,15 +94,16 @@ func (u *SealosUpgrade) UpgradeOtherMaster() {
}

func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) {
var wg sync.WaitGroup
wg := NewPool(2)
var err error
for _, hostname := range hostnames {
wg.Add(1)
go func(node string) {
defer wg.Done()
ip := u.GetIpByHostname(node)
// drain worker node is too danger for prod use; do not drain nodes if worker nodes~
if isMaster {
logger.Info("first: to drain master node %s", node)
logger.Info("[%s] first: to drain master node %s", ip, node)
cmdDrain := fmt.Sprintf(`kubectl drain %s --ignore-daemonsets --delete-local-data`, node)
err := SSHConfig.CmdAsync(u.Masters[0], cmdDrain)
if err != nil {
Expand All @@ -114,8 +114,7 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) {
}

// second to exec kubeadm upgrade node
logger.Info("second: to exec kubeadm upgrade node on %s", node)
ip := u.GetIpByHostname(node)
logger.Info("[%s] second: to exec kubeadm upgrade node on %s", ip, node)
var cmdUpgrade string
if ip == u.Masters[0] {
cmdUpgrade = fmt.Sprintf("kubeadm upgrade apply --certificate-renewal=false --yes %s", u.NewVersion)
Expand All @@ -134,7 +133,7 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) {
}

// third to restart kubelet
logger.Info("third: to restart kubelet on %s", node)
logger.Info("[%s] third: to restart kubelet on %s", ip, node)
err = SSHConfig.CmdAsync(ip, "systemctl daemon-reload && systemctl restart kubelet")
if err != nil {
logger.Error("systemctl daemon-reload && systemctl restart kubelet err: ", err)
Expand All @@ -144,14 +143,14 @@ func (u *SealosUpgrade) upgradeNodes(hostnames []string, isMaster bool) {
time.Sleep(time.Second * 10)
k8sNode, _ := k8s.GetNodeByName(u.Client, node)
if k8s.IsNodeReady(*k8sNode) {
logger.Info("fourth: %s nodes is ready", node)
logger.Info("[%s] fourth: %s nodes is ready", ip,node)

// fifth to uncordon node
err = k8s.CordonUnCordon(u.Client, node, false)
if err != nil {
logger.Error(`k8s.CordonUnCordon err: %s, \n After upgrade, please run "kubectl uncordon %s" to enable Scheduling`, err, node)
}
logger.Info("fifth: to uncordon node, 10 seconds to wait for %s uncordon", node)
logger.Info("[%s] fifth: to uncordon node, 10 seconds to wait for %s uncordon", ip, node)
} else {
logger.Error("fourth: %s nodes is not ready, please check the nodes logs to find out reason", node)
}
Expand Down
38 changes: 38 additions & 0 deletions install/upgrade_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package install

import "sync"


type uPool struct {
queue chan int
wg *sync.WaitGroup
}

func NewPool (size int) *uPool {
if size <= 1 {
size = 1
}
return &uPool{
queue: make(chan int, size),
wg: &sync.WaitGroup{},
}
}

func (p *uPool) Add(delta int) {
for i := 0; i < delta; i++ {
p.queue <- 1
}
for i := 0; i > delta; i-- {
<-p.queue
}
p.wg.Add(delta)
}

func (p *uPool) Done() {
<-p.queue
p.wg.Done()
}

func (p *uPool) Wait() {
p.wg.Wait()
}
17 changes: 17 additions & 0 deletions install/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package install
import (
"fmt"
"reflect"
"runtime"
"strings"
"testing"
"time"
)

func TestPath(t *testing.T) {
Expand Down Expand Up @@ -228,4 +230,19 @@ func TestFor120(t *testing.T) {
}
})
}
}

func Test_Example(t *testing.T) {
pool := NewPool(2)
println(runtime.NumGoroutine())
for i := 0; i < 10; i++ {
pool.Add(1)
go func(n int) {
time.Sleep(time.Second)
println(runtime.NumGoroutine(), n)
pool.Done()
}(i)
}
pool.Wait()
println(runtime.NumGoroutine())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
2 changes: 1 addition & 1 deletion k8s/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func CordonUnCordon(k8sClient *kubernetes.Clientset, nodeName string, cordoned b
return err
}
if node.Spec.Unschedulable == cordoned {
logger.Alert("Node %s is already cordoned: %v", nodeName, cordoned)
logger.Info("Node %s is already Uncordoned, skip...", nodeName)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果已经是ready。 则INFO 而不是Alert

return nil
}
node.Spec.Unschedulable = cordoned
Expand Down