Skip to content

Commit

Permalink
Merge pull request #991 from AlexAi27/fix_log_collection_after_contai…
Browse files Browse the repository at this point in the history
…ner_stop

Fix log collection after container stop
  • Loading branch information
wenxinlee2015 authored Oct 11, 2021
2 parents cfe4b45 + 795cda5 commit 4e7862d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 48 deletions.
7 changes: 2 additions & 5 deletions bcs-services/bcs-logbeat-sidecar/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ replace (
)

require (
github.com/Tencent/bk-bcs/bcs-common v0.0.0-20210818040851-76fdc539dc33
github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs v0.0.0-20210518090424-99527484a283
github.com/Tencent/bk-bcs/bcs-common v0.0.0-20210908080357-99540f892332
github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs v0.0.0-20210810131039-5220f346d815
github.com/containerd/containerd v1.4.3 // indirect
github.com/docker/docker v20.10.0-rc1+incompatible // indirect
github.com/fsouza/go-dockerclient v1.6.5
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/imdario/mergo v0.3.11 // indirect
github.com/moby/sys/mount v0.2.0 // indirect
github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_model v0.2.0
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/yaml.v2 v2.3.0
gotest.tools/v3 v3.0.3 // indirect
k8s.io/api v0.18.5
k8s.io/apiextensions-apiserver v0.18.2
k8s.io/apimachinery v0.18.5
Expand Down
2 changes: 1 addition & 1 deletion bcs-services/bcs-logbeat-sidecar/sidecar/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *SidecarController) inspectContainer(ID string) *docker.Container {
return c
case <-timer.C:
cancelFunc()
blog.Errorf("Inspect container %d timeout unexpected, check whether pod is working properly with sharePID mode.", ID)
blog.Errorf("Inspect container %s timeout unexpected, check whether pod is working properly with sharePID mode.", ID)
return nil
}
}
83 changes: 47 additions & 36 deletions bcs-services/bcs-logbeat-sidecar/sidecar/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/Tencent/bk-bcs/bcs-common/common/blog"
bcsv1 "github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs/apis/bkbcs/v1"
Expand Down Expand Up @@ -150,6 +151,20 @@ func (s *SidecarController) listenerDockerEvent() {
}
}()

freeFunc := func(containerID string) {
s.containerCacheMutex.Lock()
delete(s.containerCache, containerID)
s.containerCacheMutex.Unlock()
s.Lock()
s.deleteContainerLogConf(containerID)
s.Unlock()
err = s.reloadLogbeat()
if err != nil {
blog.Errorf("reload logbeat failed: %s", err.Error())
}
blog.V(3).Infof("reload logbeat succ")
}

for {
var msg *docker.APIEvents
select {
Expand All @@ -160,9 +175,11 @@ func (s *SidecarController) listenerDockerEvent() {
switch msg.Action {
//start container
case "start":
blog.Infof("docker action : %+v", *msg)
c := s.inspectContainer(msg.ID)
if c == nil {
blog.Errorf("inspect container %s failed", msg.ID)
freeFunc(msg.ID)
break
}
s.containerCacheMutex.Lock()
Expand All @@ -175,29 +192,29 @@ func (s *SidecarController) listenerDockerEvent() {
}
blog.V(3).Infof("reload logbeat succ")

// destroy container
case "destroy":
s.containerCacheMutex.Lock()
delete(s.containerCache, msg.ID)
s.containerCacheMutex.Unlock()
s.Lock()
s.deleteContainerLogConf(msg.ID)
s.Unlock()
err = s.reloadLogbeat()
if err != nil {
blog.Errorf("reload logbeat failed: %s", err.Error())
// exit container
case "die", "stop":
blog.Infof("docker action : %+v", *msg)
s.containerCacheMutex.RLock()
c, ok := s.containerCache[msg.ID]
s.containerCacheMutex.RUnlock()
if !ok {
blog.Errorf("Container info with containerID (%s) did not in containerCache", msg.ID)
freeFunc(msg.ID)
break
}
blog.V(3).Infof("reload logbeat succ")
// stop container
case "stop":
s.Lock()
s.deleteContainerLogConf(msg.ID)
s.Unlock()
c.State.Running = false
c.State.Dead = true
s.produceContainerLogConf(c)
err = s.reloadLogbeat()
if err != nil {
blog.Errorf("reload logbeat failed: %s", err.Error())
}
blog.V(3).Infof("reload logbeat succ")

// destroy container
case "destroy":
freeFunc(msg.ID)
}
}
}
Expand Down Expand Up @@ -510,6 +527,7 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe
matchedLogConfig.LogPaths = logConf.Spec.LogPaths
matchedLogConfig.HostPaths = logConf.Spec.HostPaths
matchedLogConfig.LogTags = logConf.Spec.LogTags
matchedLogConfig.Multiline = logConf.Spec.Multiline
matchedLogConfigs = append(matchedLogConfigs, &matchedLogConfig)
}

Expand All @@ -535,11 +553,16 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe
Paths: make([]string, 0),
ToJSON: true,
OutputFormat: s.conf.LogbeatOutputFormat,
Package: logConf.Spec.PackageCollection,
}
blog.Infof("container info: %+v", *container)
if !container.State.Running {
var closeEOF bool = true
para.CloseEOF = &closeEOF
para.CloseTimeout = time.Duration(time.Duration(logConf.Spec.ExitedContainerLogCloseTimeout) * time.Second).String()
}
if logConf.Spec.PackageCollection {
pack := new(bool)
*pack = true
para.Package = pack
if conf.Multiline != nil && conf.Multiline.Type != "" {
para.Multiline = conf.Multiline
}
para.ExtMeta["io_tencent_bcs_cluster"] = logConf.Spec.ClusterId
para.ExtMeta["io_tencent_bcs_pod"] = pod.Name
Expand All @@ -551,15 +574,16 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe
para.ExtMeta["io_tencent_bcs_projectid"] = pod.Labels["io.tencent.paas.projectid"]
para.ExtMeta["container_id"] = container.ID
para.ExtMeta["container_hostname"] = container.Config.Hostname
para.ExtMeta["io_tencent_bcs_container_name"] = container.Config.Labels[ContainerLabelK8sContainerName]
//whether report pod labels to log tags
if logConf.Spec.PodLabels {
for k, v := range pod.Labels {
para.ExtMeta[k] = v
para.ExtMeta[fmt.Sprintf("labels_%s", strings.ReplaceAll(k, ".", "_"))] = v
}
}
//custom log tags
for k, v := range conf.LogTags {
para.ExtMeta[k] = v
para.ExtMeta[fmt.Sprintf("%s", strings.ReplaceAll(k, ".", "_"))] = v
}
// generate std output log collection config
if stdoutDataid == "" && conf.Stdout && conf.StdDataId != "" {
Expand Down Expand Up @@ -648,16 +672,3 @@ func (s *SidecarController) getFirstWildcardPos(str string) int {
}
return pos
}

// getContainerRootPath return the root path of the container
// Usually it begins with /data/bcs/lib/docker/overlay2/{hashid}/merged
// If the container does not use OverlayFS, it will return /proc/{procid}/root
func (s *SidecarController) getContainerRootPath(container *docker.Container) string {
switch container.Driver {
case "overlay2":
return container.GraphDriver.Data["MergedDir"]
default:
// blog.Warnf("Container %s has driver %s not overlay2", container.ID, container.Driver)
return fmt.Sprintf("/proc/%d/root", container.State.Pid)
}
}
13 changes: 13 additions & 0 deletions bcs-services/bcs-logbeat-sidecar/sidecar/logconfop_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,16 @@ func (s *SidecarController) getActualPath(logPath string, container *docker.Cont
blog.V(3).Infof("origin path: %s, clean path: %s", logPath, retpath)
return retpath, nil
}

// getContainerRootPath return the root path of the container
// Usually it begins with /data/bcs/lib/docker/overlay2/{hashid}/merged
// If the container does not use OverlayFS, it will return /proc/{procid}/root
func (s *SidecarController) getContainerRootPath(container *docker.Container) string {
switch container.Driver {
case "overlay2":
return container.GraphDriver.Data["UpperDir"]
default:
// blog.Warnf("Container %s has driver %s not overlay2", container.ID, container.Driver)
return fmt.Sprintf("/proc/%d/root", container.State.Pid)
}
}
16 changes: 10 additions & 6 deletions bcs-services/bcs-logbeat-sidecar/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package types

import (
bcsv1 "github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs/apis/bkbcs/v1"
"github.com/Tencent/bk-bcs/bcs-services/bcs-logbeat-sidecar/metric"
)

Expand All @@ -26,12 +27,15 @@ type Yaml struct {

// Local is a single log collection task with single dataid
type Local struct {
DataID int `yaml:"dataid"`
OutputFormat string `yaml:"output_format"`
Paths []string `yaml:"paths"`
ToJSON bool `yaml:"to_json"`
Package *bool `yaml:"package,omitempty"`
ExtMeta map[string]string `yaml:"ext_meta"`
DataID int `yaml:"dataid"`
OutputFormat string `yaml:"output_format"`
Paths []string `yaml:"paths"`
ToJSON bool `yaml:"to_json"`
Package bool `yaml:"package"`
ExtMeta map[string]string `yaml:"ext_meta"`
CloseEOF *bool `yaml:"close_eof,omitempty"`
CloseTimeout string `yaml:"close_timeout,omitempty"`
Multiline *bcsv1.MultilineConf `yaml:"multiline,omitempty"`

//stdout dataid
StdoutDataid string `yaml:"-"`
Expand Down

0 comments on commit 4e7862d

Please sign in to comment.