Skip to content

Commit

Permalink
add_kubernetes_metadata supports '/var/log/containers/' log path
Browse files Browse the repository at this point in the history
The add_kubernetes_metadata processor's LogPathMatcher could extract
a Docker container ID - and hence enrich a log document with Kubernetes
metadata - only if the log path was '/var/lib/docker/containers/'.

With this commit, the LogPathMatcher can extract the container ID also
from a '/var/log/containers/' log path (Kubernetes symlinks).
  • Loading branch information
SvenWoltmann committed Aug 25, 2017
1 parent 5613c5f commit e260550
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di

- Add PostgreSQL module with slowlog support. {pull}4763[4763]
- Add Kafka log module. {pull}4885[4885]
- Add support for `/var/log/containers/` log path in `add_kubernetes_metadata` processor. {pull}5011[5011]

*Heartbeat*

Expand Down
40 changes: 32 additions & 8 deletions filebeat/processor/add_kubernetes_metadata/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,44 @@ func newLogsPathMatcher(cfg common.Config) (add_kubernetes_metadata.Matcher, err
return &LogPathMatcher{LogsPath: logPath}, nil
}

// Docker container ID is a 64-character-long hexadecimal string
const containerIdLen = 64

// Minimum `source` lengths are calculated in order to prevent "slice bound out of range"
// runtime errors in case `source` is shorter than expected.
const kubernetesLogPath = "/var/log/containers/"
const kubernetesMinSourceLen = len(kubernetesLogPath) + containerIdLen + 4

const dockerLogPath = "/var/lib/docker/containers/"
const dockerLogPathLen = len(dockerLogPath)
const dockerMinSourceLen = dockerLogPathLen + containerIdLen

func (f *LogPathMatcher) MetadataIndex(event common.MapStr) string {
if value, ok := event["source"]; ok {
source := value.(string)
logp.Debug("kubernetes", "Incoming source value: ", source)
cid := ""
if strings.Contains(source, f.LogsPath) {
//Docker container is 64 chars in length
cid = source[len(f.LogsPath) : len(f.LogsPath)+64]
logp.Debug("kubernetes", "Incoming source value: %s", source)

sourceLen := len(source)

// Variant 1: Kubernetes log path "/var/log/containers/...-${cid}.log"
if strings.HasPrefix(source, kubernetesLogPath) &&
strings.HasSuffix(source, ".log") &&
sourceLen >= kubernetesMinSourceLen {
containerIdEnd := sourceLen - 4
cid := source[containerIdEnd - containerIdLen : containerIdEnd]
logp.Debug("kubernetes", "Using container id: %s", cid)
return cid;
}
logp.Debug("kubernetes", "Using container id: ", cid)

if cid != "" {
return cid
// Variant 2: Docker log path "/var/lib/docker/containers/${cid}/${cid}-json.log"
if strings.HasPrefix(source, dockerLogPath) &&
sourceLen >= dockerMinSourceLen {
cid := source[dockerLogPathLen : dockerLogPathLen + containerIdLen]
logp.Debug("kubernetes", "Using container id: %s", cid)
return cid;
}

logp.Debug("kubernetes", "No container id found in source.")
}

return ""
Expand Down
49 changes: 36 additions & 13 deletions filebeat/processor/add_kubernetes_metadata/indexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,47 @@ import (
"github.com/elastic/beats/libbeat/common"
)

func TestLogsPathMatcher(t *testing.T) {
var testConfig = common.NewConfig()
// A random container ID that we use for our tests
const cid = "0069869de9adf97f574c62029aeba65d1ecd85a2a112e87fbc28afe4dec2b843"

func TestLogsPathMatcher_InvalidSource1(t *testing.T) {
source := "/var/log/messages"
expectedResult := ""
executeTest(t, source, expectedResult);
}

func TestLogsPathMatcher_InvalidSource2(t *testing.T) {
source := "/var/lib/docker/containers/01234567/89abcdef-json.log"
expectedResult := ""
executeTest(t, source, expectedResult);
}

func TestLogsPathMatcher_InvalidSource3(t *testing.T) {
source := "/var/log/containers/pod_ns_container_01234567.log"
expectedResult := ""
executeTest(t, source, expectedResult);
}

func TestLogsPathMatcher_VarLibDockerContainers(t *testing.T) {
source := fmt.Sprintf("/var/lib/docker/containers/%s/%s-json.log", cid, cid)
expectedResult := cid;
executeTest(t, source, expectedResult);
}

func TestLogsPathMatcher_VarLogContainers(t *testing.T) {
source := fmt.Sprintf("/var/log/containers/kube-proxy-4d7nt_kube-system_kube-proxy-%s.log", cid)
expectedResult := cid;
executeTest(t, source, expectedResult);
}

func executeTest(t *testing.T, source string, expectedResult string) {
var testConfig = common.NewConfig()
logMatcher, err := newLogsPathMatcher(*testConfig)
assert.Nil(t, err)

cid := "0069869de9adf97f574c62029aeba65d1ecd85a2a112e87fbc28afe4dec2b843"
logPath := fmt.Sprintf("/var/lib/docker/containers/%s/%s-json.log", cid, cid)

input := common.MapStr{
"source": "/var/log/messages",
"source": source,
}

output := logMatcher.MetadataIndex(input)
assert.Equal(t, output, "")

input["source"] = logPath
output = logMatcher.MetadataIndex(input)

assert.Equal(t, output, cid)
assert.Equal(t, output, expectedResult)
}

0 comments on commit e260550

Please sign in to comment.