diff --git a/bcs-services/bcs-logbeat-sidecar/go.mod b/bcs-services/bcs-logbeat-sidecar/go.mod index 0824d9faee..cc8b7dd1aa 100644 --- a/bcs-services/bcs-logbeat-sidecar/go.mod +++ b/bcs-services/bcs-logbeat-sidecar/go.mod @@ -8,21 +8,18 @@ replace ( ) require ( - github.com/Tencent/bk-bcs/bcs-common v0.0.0-20210818040851-76fdc539dc33 - github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs v0.0.0-20210518090424-99527484a283 + github.com/Tencent/bk-bcs/bcs-common v0.0.0-20210908080357-99540f892332 + github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs v0.0.0-20210810131039-5220f346d815 github.com/containerd/containerd v1.4.3 // indirect github.com/docker/docker v20.10.0-rc1+incompatible // indirect github.com/fsouza/go-dockerclient v1.6.5 - github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect github.com/imdario/mergo v0.3.11 // indirect github.com/moby/sys/mount v0.2.0 // indirect github.com/moby/term v0.0.0-20201216013528-df9cb8a40635 // indirect - github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/prometheus/client_golang v1.9.0 github.com/prometheus/client_model v0.2.0 gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce gopkg.in/yaml.v2 v2.3.0 - gotest.tools/v3 v3.0.3 // indirect k8s.io/api v0.18.5 k8s.io/apiextensions-apiserver v0.18.2 k8s.io/apimachinery v0.18.5 diff --git a/bcs-services/bcs-logbeat-sidecar/sidecar/cache.go b/bcs-services/bcs-logbeat-sidecar/sidecar/cache.go index 6ac1db0e2e..8ba9e1af4e 100644 --- a/bcs-services/bcs-logbeat-sidecar/sidecar/cache.go +++ b/bcs-services/bcs-logbeat-sidecar/sidecar/cache.go @@ -91,7 +91,7 @@ func (s *SidecarController) inspectContainer(ID string) *docker.Container { return c case <-timer.C: cancelFunc() - blog.Errorf("Inspect container %d timeout unexpected, check whether pod is working properly with sharePID mode.", ID) + blog.Errorf("Inspect container %s timeout unexpected, check whether pod is working properly with sharePID mode.", ID) return nil } } diff --git a/bcs-services/bcs-logbeat-sidecar/sidecar/controller.go b/bcs-services/bcs-logbeat-sidecar/sidecar/controller.go index c91b48a221..bc43f69fb3 100644 --- a/bcs-services/bcs-logbeat-sidecar/sidecar/controller.go +++ b/bcs-services/bcs-logbeat-sidecar/sidecar/controller.go @@ -21,6 +21,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/Tencent/bk-bcs/bcs-common/common/blog" bcsv1 "github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs/apis/bkbcs/v1" @@ -150,6 +151,20 @@ func (s *SidecarController) listenerDockerEvent() { } }() + freeFunc := func(containerID string) { + s.containerCacheMutex.Lock() + delete(s.containerCache, containerID) + s.containerCacheMutex.Unlock() + s.Lock() + s.deleteContainerLogConf(containerID) + s.Unlock() + err = s.reloadLogbeat() + if err != nil { + blog.Errorf("reload logbeat failed: %s", err.Error()) + } + blog.V(3).Infof("reload logbeat succ") + } + for { var msg *docker.APIEvents select { @@ -160,9 +175,11 @@ func (s *SidecarController) listenerDockerEvent() { switch msg.Action { //start container case "start": + blog.Infof("docker action : %+v", *msg) c := s.inspectContainer(msg.ID) if c == nil { blog.Errorf("inspect container %s failed", msg.ID) + freeFunc(msg.ID) break } s.containerCacheMutex.Lock() @@ -175,29 +192,29 @@ func (s *SidecarController) listenerDockerEvent() { } blog.V(3).Infof("reload logbeat succ") - // destroy container - case "destroy": - s.containerCacheMutex.Lock() - delete(s.containerCache, msg.ID) - s.containerCacheMutex.Unlock() - s.Lock() - s.deleteContainerLogConf(msg.ID) - s.Unlock() - err = s.reloadLogbeat() - if err != nil { - blog.Errorf("reload logbeat failed: %s", err.Error()) + // exit container + case "die", "stop": + blog.Infof("docker action : %+v", *msg) + s.containerCacheMutex.RLock() + c, ok := s.containerCache[msg.ID] + s.containerCacheMutex.RUnlock() + if !ok { + blog.Errorf("Container info with containerID (%s) did not in containerCache", msg.ID) + freeFunc(msg.ID) + break } - blog.V(3).Infof("reload logbeat succ") - // stop container - case "stop": - s.Lock() - s.deleteContainerLogConf(msg.ID) - s.Unlock() + c.State.Running = false + c.State.Dead = true + s.produceContainerLogConf(c) err = s.reloadLogbeat() if err != nil { blog.Errorf("reload logbeat failed: %s", err.Error()) } blog.V(3).Infof("reload logbeat succ") + + // destroy container + case "destroy": + freeFunc(msg.ID) } } } @@ -510,6 +527,7 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe matchedLogConfig.LogPaths = logConf.Spec.LogPaths matchedLogConfig.HostPaths = logConf.Spec.HostPaths matchedLogConfig.LogTags = logConf.Spec.LogTags + matchedLogConfig.Multiline = logConf.Spec.Multiline matchedLogConfigs = append(matchedLogConfigs, &matchedLogConfig) } @@ -535,11 +553,16 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe Paths: make([]string, 0), ToJSON: true, OutputFormat: s.conf.LogbeatOutputFormat, + Package: logConf.Spec.PackageCollection, + } + blog.Infof("container info: %+v", *container) + if !container.State.Running { + var closeEOF bool = true + para.CloseEOF = &closeEOF + para.CloseTimeout = time.Duration(time.Duration(logConf.Spec.ExitedContainerLogCloseTimeout) * time.Second).String() } - if logConf.Spec.PackageCollection { - pack := new(bool) - *pack = true - para.Package = pack + if conf.Multiline != nil && conf.Multiline.Type != "" { + para.Multiline = conf.Multiline } para.ExtMeta["io_tencent_bcs_cluster"] = logConf.Spec.ClusterId para.ExtMeta["io_tencent_bcs_pod"] = pod.Name @@ -551,15 +574,16 @@ func (s *SidecarController) produceLogConfParameterV2(container *docker.Containe para.ExtMeta["io_tencent_bcs_projectid"] = pod.Labels["io.tencent.paas.projectid"] para.ExtMeta["container_id"] = container.ID para.ExtMeta["container_hostname"] = container.Config.Hostname + para.ExtMeta["io_tencent_bcs_container_name"] = container.Config.Labels[ContainerLabelK8sContainerName] //whether report pod labels to log tags if logConf.Spec.PodLabels { for k, v := range pod.Labels { - para.ExtMeta[k] = v + para.ExtMeta[fmt.Sprintf("labels_%s", strings.ReplaceAll(k, ".", "_"))] = v } } //custom log tags for k, v := range conf.LogTags { - para.ExtMeta[k] = v + para.ExtMeta[fmt.Sprintf("%s", strings.ReplaceAll(k, ".", "_"))] = v } // generate std output log collection config if stdoutDataid == "" && conf.Stdout && conf.StdDataId != "" { @@ -648,16 +672,3 @@ func (s *SidecarController) getFirstWildcardPos(str string) int { } return pos } - -// getContainerRootPath return the root path of the container -// Usually it begins with /data/bcs/lib/docker/overlay2/{hashid}/merged -// If the container does not use OverlayFS, it will return /proc/{procid}/root -func (s *SidecarController) getContainerRootPath(container *docker.Container) string { - switch container.Driver { - case "overlay2": - return container.GraphDriver.Data["MergedDir"] - default: - // blog.Warnf("Container %s has driver %s not overlay2", container.ID, container.Driver) - return fmt.Sprintf("/proc/%d/root", container.State.Pid) - } -} diff --git a/bcs-services/bcs-logbeat-sidecar/sidecar/logconfop_linux.go b/bcs-services/bcs-logbeat-sidecar/sidecar/logconfop_linux.go index c4be3a798c..09cd564e9e 100644 --- a/bcs-services/bcs-logbeat-sidecar/sidecar/logconfop_linux.go +++ b/bcs-services/bcs-logbeat-sidecar/sidecar/logconfop_linux.go @@ -78,3 +78,16 @@ func (s *SidecarController) getActualPath(logPath string, container *docker.Cont blog.V(3).Infof("origin path: %s, clean path: %s", logPath, retpath) return retpath, nil } + +// getContainerRootPath return the root path of the container +// Usually it begins with /data/bcs/lib/docker/overlay2/{hashid}/merged +// If the container does not use OverlayFS, it will return /proc/{procid}/root +func (s *SidecarController) getContainerRootPath(container *docker.Container) string { + switch container.Driver { + case "overlay2": + return container.GraphDriver.Data["UpperDir"] + default: + // blog.Warnf("Container %s has driver %s not overlay2", container.ID, container.Driver) + return fmt.Sprintf("/proc/%d/root", container.State.Pid) + } +} diff --git a/bcs-services/bcs-logbeat-sidecar/types/types.go b/bcs-services/bcs-logbeat-sidecar/types/types.go index 94c9f49e5a..b0c0ac434f 100644 --- a/bcs-services/bcs-logbeat-sidecar/types/types.go +++ b/bcs-services/bcs-logbeat-sidecar/types/types.go @@ -14,6 +14,7 @@ package types import ( + bcsv1 "github.com/Tencent/bk-bcs/bcs-k8s/kubebkbcs/apis/bkbcs/v1" "github.com/Tencent/bk-bcs/bcs-services/bcs-logbeat-sidecar/metric" ) @@ -26,12 +27,15 @@ type Yaml struct { // Local is a single log collection task with single dataid type Local struct { - DataID int `yaml:"dataid"` - OutputFormat string `yaml:"output_format"` - Paths []string `yaml:"paths"` - ToJSON bool `yaml:"to_json"` - Package *bool `yaml:"package,omitempty"` - ExtMeta map[string]string `yaml:"ext_meta"` + DataID int `yaml:"dataid"` + OutputFormat string `yaml:"output_format"` + Paths []string `yaml:"paths"` + ToJSON bool `yaml:"to_json"` + Package bool `yaml:"package"` + ExtMeta map[string]string `yaml:"ext_meta"` + CloseEOF *bool `yaml:"close_eof,omitempty"` + CloseTimeout string `yaml:"close_timeout,omitempty"` + Multiline *bcsv1.MultilineConf `yaml:"multiline,omitempty"` //stdout dataid StdoutDataid string `yaml:"-"`